Синхронизация из Kafka

ПРИМЕЧАНИЕ: эта функциональность требует Manticore Buddy. Если она не работает, убедитесь, что Buddy установлен.

Manticore поддерживает интеграцию с Apache Kafka для приема данных в реальном времени через источники Kafka и материализованные представления, что позволяет индексировать и искать данные в реальном времени. В настоящее время протестированы и поддерживаются apache/kafka версии 3.7.0-4.1.0.

Чтобы начать, вам необходимо:

  1. Определить источник: Указать топик Kafka, из которого Manticore Search будет читать сообщения. Эта настройка включает такие детали, как хост брокера, порт и имя топика.
  2. Настроить целевую таблицу: Выбрать таблицу Manticore реального времени для хранения входящих данных Kafka.
  3. Создать материализованное представление: Настроить материализованное представление (mv) для обработки преобразования данных и сопоставления из Kafka в целевую таблицу в Manticore Search. Здесь вы определите сопоставления полей, преобразования данных, а также любые фильтры или условия для входящего потока данных.

Источник

Конфигурация source позволяет определить broker, список топиков, группу потребителей и структуру сообщений.

Схема

Определите схему, используя типы полей Manticore, такие как int, float, text, json и т.д.

CREATE SOURCE <source name> [(column type, ...)] [source_options]

Все ключи схемы нечувствительны к регистру, то есть Products, products и PrOdUcTs обрабатываются одинаково. Все они преобразуются в нижний регистр.

Если имена ваших полей не соответствуют синтаксису имен полей, разрешенному в Manticore Search (например, если они содержат специальные символы или начинаются с цифр), вы должны определить сопоставление схемы. Например, $keyName или 123field являются допустимыми ключами в JSON, но не являются допустимыми именами полей в Manticore Search. Если вы попытаетесь использовать недопустимые имена полей без правильного сопоставления, Manticore вернет ошибку, и создание источника завершится неудачей.

Для обработки таких случаев используйте следующий синтаксис схемы для сопоставления недопустимых имен полей с допустимыми:

allowed_field_name 'original JSON key name with special symbols' type

Например:

price_field '$price' float    -- maps JSON key '$price' to field 'price_field'
field_123 '123field' text     -- maps JSON key '123field' to field 'field_123'
‹›
  • SQL
SQL
📋
CREATE SOURCE kafka
(id bigint, term text, abbrev '$abbrev' text, GlossDef json)
type='kafka'
broker_list='kafka:9092'
topic_list='my-data'
consumer_group='manticore'
num_consumers='2'
batch=50
‹›
Response
Query OK, 2 rows affected (0.02 sec)

Опции

Опция Допустимые значения Описание
type kafka Устанавливает тип источника. В настоящее время поддерживается только kafka
broker_list хост:порт [, ...] Указывает URL-адреса брокеров Kafka
topic_list строка [, ...] Список топиков Kafka для потребления
consumer_group строка Определяет группу потребителей Kafka, по умолчанию manticore.
num_consumers int Количество потребителей для обработки сообщений.
partition_list int [, ...] Список разделов для чтения подробнее.
batch int Количество сообщений для обработки перед переходом к следующей партии. По умолчанию 100; в противном случае оставшиеся сообщения обрабатываются по таймауту

Целевая таблица

Целевая таблица — это обычная таблица реального времени, в которой хранятся результаты обработки сообщений Kafka. Эта таблица должна быть определена в соответствии с требованиями схемы входящих данных и оптимизирована для потребностей производительности запросов вашего приложения. Подробнее о создании таблиц реального времени читайте здесь.

‹›
  • SQL
SQL
📋
CREATE TABLE destination_kafka
(id bigint, name text, short_name text, received_at text, size multi);
‹›
Response
Query OK, 0 rows affected (0.02 sec)

Материализованное представление

Материализованное представление позволяет преобразовывать данные из сообщений Kafka. Вы можете переименовывать поля, применять функции Manticore Search, а также выполнять сортировку, группировку и другие операции с данными.

Материализованное представление действует как запрос, который перемещает данные из источника Kafka в целевую таблицу, позволяя вам использовать синтаксис Manticore Search для настройки этих запросов. Убедитесь, что поля в select соответствуют полям в источнике.

CREATE MATERIALIZED VIEW <materialized view name>
TO <destination table name> AS
SELECT [column|function [as <new name>], ...] FROM <source name>
‹›
  • SQL
SQL
📋
CREATE MATERIALIZED VIEW view_table
TO destination_kafka AS
SELECT id, term as name, abbrev as short_name,
       UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka
‹›
Response
Query OK, 2 rows affected (0.02 sec)

Данные передаются из Kafka в Manticore Search пакетами, которые очищаются после каждого запуска. Для вычислений между пакетами, таких как AVG, будьте осторожны, так как они могут работать не так, как ожидается, из-за обработки пакет за пакетом.

Сопоставление полей

Вот таблица сопоставления на основе приведенных выше примеров:

Kafka Источник Буфер МВ Назначение
id id id id id
term term term term as name name
unnecessary_key который нас не интересует - -
$abbrev abbrev abbrev abbrev as short_name short_name
- - - UTC_TIMESTAMP() as received_at received_at
GlossDef glossdef glossdef glossdef.size as size size

Просмотр списка

Чтобы просмотреть источники и материализованные представления в Manticore Search, используйте следующие команды:

  • SHOW SOURCES: Выводит список всех настроенных источников.
  • SHOW MVS: Выводит список всех материализованных представлений.
  • SHOW MV view_table: Показывает подробную информацию о конкретном материализованном представлении.
‹›
  • SQL
SQL
📋
SHOW SOURCES
‹›
Response
+-------+
| name  |
+-------+
| kafka |
+-------+
‹›
  • SQL
SQL
📋
SHOW SOURCE kafka;
‹›
Response
+--------+-------------------------------------------------------------------+
| Source | Create Table                                                      |
+--------+-------------------------------------------------------------------+
| kafka  | CREATE SOURCE kafka                                               |
|        | (id bigint, term text, abbrev '$abbrev' text, GlossDef json)      |
|        | type='kafka'                                                      |
|        | broker_list='kafka:9092'                                          |
|        | topic_list='my-data'                                              |
|        | consumer_group='manticore'                                        |
|        | num_consumers='2'                                                 |
|        | batch=50                                                          |
+--------+-------------------------------------------------------------------+
‹›
  • SQL
SQL
📋
SHOW MVS
‹›
Response
+------------+
| name       |
+------------+
| view_table |
+------------+
‹›
  • SQL
SQL
📋
SHOW MV view_table
‹›
Response
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| View       | Create Table                                                                                           | suspended |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| view_table | CREATE MATERIALIZED VIEW view_table TO destination_kafka AS                                            | 0         |
|            | SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size   |           |
|            | FROM kafka                                                                                             |           |
+------------+--------------------------------------------------------------------------------------------------------+-----------+

Изменение материализованных представлений

Вы можете приостановить потребление данных, изменив материализованные представления.

Если вы удалите source без удаления МП, он автоматически приостановится. После воссоздания источника снимите приостановку МП вручную с помощью команды ALTER.

В настоящее время можно изменять только материализованные представления. Чтобы изменить параметры source, удалите и создайте источник заново.

‹›
  • SQL
SQL
📋
ALTER MATERIALIZED VIEW view_table suspended=1
‹›
Response
Query OK (0.02 sec)

Шардинг с Kafka

Вы также можете указать partition_list для каждой темы Kafka. Одним из основных преимуществ такого подхода является возможность реализации шардинга вашей таблицы через Kafka. Для этого следует создать отдельную цепочку sourcematerialized viewdestination table для каждого шарда:

Источники:

CREATE SOURCE kafka_p1 (id bigint, term text)
  type='kafka' broker_list='kafka:9092' topic_list='my-data'
  consumer_group='manticore' num_consumers='1' partition_list='0' batch=50;
CREATE SOURCE kafka_p2 (id bigint, term text)
  type='kafka' broker_list='kafka:9092' topic_list='my-data'
  consumer_group='manticore' num_consumers='1' partition_list='1' batch=50;

Таблицы назначения:

CREATE TABLE destination_shard_1 (id bigint, name text);
CREATE TABLE destination_shard_2 (id bigint, name text);

Материализованные представления:

CREATE MATERIALIZED VIEW mv_1 TO destination_shard_1 AS SELECT id, term AS name FROM kafka_p1;
CREATE MATERIALIZED VIEW mv_2 TO destination_shard_2 AS SELECT id, term AS name FROM kafka_p2;

⚠️ Важные замечания:

  • В этой конфигурации ребалансировка должна управляться вручную.
  • По умолчанию Kafka не распределяет сообщения по схеме round-robin.
  • Чтобы добиться распределения, похожего на round-robin при отправке данных, убедитесь, что ваш Kafka producer настроен с:
    • parse.key=true
    • key.separator={your_delimiter}

Иначе Kafka будет распределять сообщения по своим внутренним правилам, что может привести к неравномерному разделению по партициям.

Устранение неполадок

Дублирование записей

Фиксация смещений Kafka происходит после каждой партии или при истечении времени обработки. Если процесс неожиданно остановится во время выполнения запроса материализованного представления, могут появиться дубликаты записей. Чтобы избежать этого, включите в схему поле id, позволяющее Manticore Search предотвращать дублирование в таблице.

Как это работает внутри

  • Инициализация работника: После настройки источника и материализованного представления Manticore Search создает выделенного работника для обработки поступления данных из Kafka.
  • Отображение сообщений: Сообщения сопоставляются согласно схеме конфигурации источника, преобразуясь в структурированный формат.
  • Группировка в партии: Сообщения группируются в партии для эффективной обработки. Размер партии можно настроить в зависимости от ваших требований к производительности и задержкам.
  • Буферизация: Партии преобразованных данных сохраняются во вспомогательной таблице буфера для эффективных массовых операций.
  • Обработка материализованного представления: К данным во вспомогательной таблице применяется логика представления, выполняются преобразования или фильтрация.
  • Передача данных: Обработанные данные затем передаются в конечную таблицу реального времени.
  • Очистка: Вспомогательная таблица очищается после каждой партии, чтобы подготовиться к приему следующего набора данных.
Last modified: October 28, 2025