ПРИМЕЧАНИЕ: эта функциональность требует Manticore Buddy. Если она не работает, убедитесь, что Buddy установлен.
Manticore поддерживает интеграцию с Apache Kafka для приема данных в реальном времени через источники Kafka и материализованные представления, что позволяет индексировать и искать данные в реальном времени. В настоящее время протестированы и поддерживаются apache/kafka версии 3.7.0-4.1.0.
Чтобы начать, вам необходимо:
- Определить источник: Указать топик Kafka, из которого Manticore Search будет читать сообщения. Эта настройка включает такие детали, как хост брокера, порт и имя топика.
- Настроить целевую таблицу: Выбрать таблицу Manticore реального времени для хранения входящих данных Kafka.
- Создать материализованное представление: Настроить материализованное представление (
mv) для обработки преобразования данных и сопоставления из Kafka в целевую таблицу в Manticore Search. Здесь вы определите сопоставления полей, преобразования данных, а также любые фильтры или условия для входящего потока данных.
Конфигурация source позволяет определить broker, список топиков, группу потребителей и структуру сообщений.
Определите схему, используя типы полей Manticore, такие как int, float, text, json и т.д.
CREATE SOURCE <source name> [(column type, ...)] [source_options]
Все ключи схемы нечувствительны к регистру, то есть Products, products и PrOdUcTs обрабатываются одинаково. Все они преобразуются в нижний регистр.
Если имена ваших полей не соответствуют синтаксису имен полей, разрешенному в Manticore Search (например, если они содержат специальные символы или начинаются с цифр), вы должны определить сопоставление схемы. Например, $keyName или 123field являются допустимыми ключами в JSON, но не являются допустимыми именами полей в Manticore Search. Если вы попытаетесь использовать недопустимые имена полей без правильного сопоставления, Manticore вернет ошибку, и создание источника завершится неудачей.
Для обработки таких случаев используйте следующий синтаксис схемы для сопоставления недопустимых имен полей с допустимыми:
allowed_field_name 'original JSON key name with special symbols' type
Например:
price_field '$price' float -- maps JSON key '$price' to field 'price_field'
field_123 '123field' text -- maps JSON key '123field' to field 'field_123'
CREATE SOURCE kafka
(id bigint, term text, abbrev '$abbrev' text, GlossDef json)
type='kafka'
broker_list='kafka:9092'
topic_list='my-data'
consumer_group='manticore'
num_consumers='2'
batch=50
Query OK, 2 rows affected (0.02 sec)
| Опция |
Допустимые значения |
Описание |
type |
kafka |
Устанавливает тип источника. В настоящее время поддерживается только kafka |
broker_list |
хост:порт [, ...] |
Указывает URL-адреса брокеров Kafka |
topic_list |
строка [, ...] |
Список топиков Kafka для потребления |
consumer_group |
строка |
Определяет группу потребителей Kafka, по умолчанию manticore. |
num_consumers |
int |
Количество потребителей для обработки сообщений. |
partition_list |
int [, ...] |
Список разделов для чтения подробнее. |
batch |
int |
Количество сообщений для обработки перед переходом к следующей партии. По умолчанию 100; в противном случае оставшиеся сообщения обрабатываются по таймауту |
Целевая таблица — это обычная таблица реального времени, в которой хранятся результаты обработки сообщений Kafka. Эта таблица должна быть определена в соответствии с требованиями схемы входящих данных и оптимизирована для потребностей производительности запросов вашего приложения. Подробнее о создании таблиц реального времени читайте здесь.
CREATE TABLE destination_kafka
(id bigint, name text, short_name text, received_at text, size multi);
Query OK, 0 rows affected (0.02 sec)
Материализованное представление позволяет преобразовывать данные из сообщений Kafka. Вы можете переименовывать поля, применять функции Manticore Search, а также выполнять сортировку, группировку и другие операции с данными.
Материализованное представление действует как запрос, который перемещает данные из источника Kafka в целевую таблицу, позволяя вам использовать синтаксис Manticore Search для настройки этих запросов. Убедитесь, что поля в select соответствуют полям в источнике.
CREATE MATERIALIZED VIEW <materialized view name>
TO <destination table name> AS
SELECT [column|function [as <new name>], ...] FROM <source name>
CREATE MATERIALIZED VIEW view_table
TO destination_kafka AS
SELECT id, term as name, abbrev as short_name,
UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka
Query OK, 2 rows affected (0.02 sec)
Данные передаются из Kafka в Manticore Search пакетами, которые очищаются после каждого запуска. Для вычислений между пакетами, таких как AVG, будьте осторожны, так как они могут работать не так, как ожидается, из-за обработки пакет за пакетом.
Вот таблица сопоставления на основе приведенных выше примеров:
| Kafka |
Источник |
Буфер |
МВ |
Назначение |
id |
id |
id |
id |
id |
term |
term |
term |
term as name |
name |
unnecessary_key который нас не интересует |
- |
- |
|
|
$abbrev |
abbrev |
abbrev |
abbrev as short_name |
short_name |
| - |
- |
- |
UTC_TIMESTAMP() as received_at |
received_at |
GlossDef |
glossdef |
glossdef |
glossdef.size as size |
size |
Чтобы просмотреть источники и материализованные представления в Manticore Search, используйте следующие команды:
SHOW SOURCES: Выводит список всех настроенных источников.
SHOW MVS: Выводит список всех материализованных представлений.
SHOW MV view_table: Показывает подробную информацию о конкретном материализованном представлении.
+-------+
| name |
+-------+
| kafka |
+-------+
+--------+-------------------------------------------------------------------+
| Source | Create Table |
+--------+-------------------------------------------------------------------+
| kafka | CREATE SOURCE kafka |
| | (id bigint, term text, abbrev '$abbrev' text, GlossDef json) |
| | type='kafka' |
| | broker_list='kafka:9092' |
| | topic_list='my-data' |
| | consumer_group='manticore' |
| | num_consumers='2' |
| | batch=50 |
+--------+-------------------------------------------------------------------+
+------------+
| name |
+------------+
| view_table |
+------------+
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| View | Create Table | suspended |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| view_table | CREATE MATERIALIZED VIEW view_table TO destination_kafka AS | 0 |
| | SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size | |
| | FROM kafka | |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
Вы можете приостановить потребление данных, изменив материализованные представления.
Если вы удалите source без удаления МП, он автоматически приостановится. После воссоздания источника снимите приостановку МП вручную с помощью команды ALTER.
В настоящее время можно изменять только материализованные представления. Чтобы изменить параметры source, удалите и создайте источник заново.
ALTER MATERIALIZED VIEW view_table suspended=1
Вы также можете указать partition_list для каждой темы Kafka.
Одним из основных преимуществ такого подхода является возможность реализации шардинга вашей таблицы через Kafka.
Для этого следует создать отдельную цепочку source → materialized view → destination table для каждого шарда:
Источники:
CREATE SOURCE kafka_p1 (id bigint, term text)
type='kafka' broker_list='kafka:9092' topic_list='my-data'
consumer_group='manticore' num_consumers='1' partition_list='0' batch=50;
CREATE SOURCE kafka_p2 (id bigint, term text)
type='kafka' broker_list='kafka:9092' topic_list='my-data'
consumer_group='manticore' num_consumers='1' partition_list='1' batch=50;
Таблицы назначения:
CREATE TABLE destination_shard_1 (id bigint, name text);
CREATE TABLE destination_shard_2 (id bigint, name text);
Материализованные представления:
CREATE MATERIALIZED VIEW mv_1 TO destination_shard_1 AS SELECT id, term AS name FROM kafka_p1;
CREATE MATERIALIZED VIEW mv_2 TO destination_shard_2 AS SELECT id, term AS name FROM kafka_p2;
- В этой конфигурации ребалансировка должна управляться вручную.
- По умолчанию Kafka не распределяет сообщения по схеме round-robin.
- Чтобы добиться распределения, похожего на round-robin при отправке данных, убедитесь, что ваш Kafka producer настроен с:
parse.key=true
key.separator={your_delimiter}
Иначе Kafka будет распределять сообщения по своим внутренним правилам, что может привести к неравномерному разделению по партициям.
Фиксация смещений Kafka происходит после каждой партии или при истечении времени обработки. Если процесс неожиданно остановится во время выполнения запроса материализованного представления, могут появиться дубликаты записей. Чтобы избежать этого, включите в схему поле id, позволяющее Manticore Search предотвращать дублирование в таблице.
- Инициализация работника: После настройки источника и материализованного представления Manticore Search создает выделенного работника для обработки поступления данных из Kafka.
- Отображение сообщений: Сообщения сопоставляются согласно схеме конфигурации источника, преобразуясь в структурированный формат.
- Группировка в партии: Сообщения группируются в партии для эффективной обработки. Размер партии можно настроить в зависимости от ваших требований к производительности и задержкам.
- Буферизация: Партии преобразованных данных сохраняются во вспомогательной таблице буфера для эффективных массовых операций.
- Обработка материализованного представления: К данным во вспомогательной таблице применяется логика представления, выполняются преобразования или фильтрация.
- Передача данных: Обработанные данные затем передаются в конечную таблицу реального времени.
- Очистка: Вспомогательная таблица очищается после каждой партии, чтобы подготовиться к приему следующего набора данных.