Integration with Logstash

NOTE: The integration with Logstash requires Manticore Buddy. If it doesn't work, make sure Buddy is installed.

Logstash is a log management tool that collects data from a variety of sources, transforms it on the fly, and sends it to your desired destination. It is often used as a data pipeline for Elasticsearch, an open-source analytics and search engine.

Now, Manticore supports the use of Logstash as a processing pipeline. This allows the collected and transformed data to be sent to Manticore just like to Elasticsearch. Currently, the versions 7.6-7.15 are supported.

Let’s examine a simple example of a Logstash config file used for indexing dpkg.log, a standard log file of the Debian package manager. The log itself has a simple structure, as shown below:

2023-05-31 10:42:55 status triggers-awaited ca-certificates-java:all 20190405ubuntu1.1
2023-05-31 10:42:55 trigproc libc-bin:amd64 2.31-0ubuntu9.9 <none>
2023-05-31 10:42:55 status half-configured libc-bin:amd64 2.31-0ubuntu9.9
2023-05-31 10:42:55 status installed libc-bin:amd64 2.31-0ubuntu9.9
2023-05-31 10:42:55 trigproc systemd:amd64 245.4-4ubuntu3.21 <none>

Logstash configuration

Here is an example Logstash configuration:

input {
  file {
    path => ["/var/log/dpkg.log"]
    start_position => "beginning"
    sincedb_path => "/dev/null"
    mode => "read"
    exit_after_read => "true"
   file_completed_action => "log"
   file_completed_log_path => "/dev/null"
  }
}

output {
  elasticsearch {
   index => " dpkg_log"
   hosts => ["http://localhost:9308"]
   ilm_enabled => false
   manage_template => false
  }
}

Note that, before proceeding further, one crucial caveat needs to be addressed: Manticore does not support Log Template Management and the Index Lifecycle Management features of Elasticsearch. As these features are enabled by default in Logstash, they need to be explicitly disabled in the config. Additionally, the hosts option in the output config section must correspond to Manticore’s HTTP listen port (default is localhost:9308).

Logstash results

After adjusting the config as described, you can run Logstash, and the data from the dpkg log will be passed to Manticore and properly indexed.

Here is the resulting schema of the created table and an example of the inserted document:

mysql> DESCRIBE dpkg_log;
+------------------+--------+---------------------+
| Field            | Type   | Properties          |
+------------------+--------+---------------------+
| id               | bigint |                     |
| message          | text   | indexed stored      |
| @version         | text   | indexed stored      |
| @timestamp       | text   | indexed stored      |
| path             | text   | indexed stored      |
| host             | text   | indexed stored      |
+------------------+--------+---------------------+
mysql> SELECT * FROM dpkg_log LIMIT 1\G

*************************** 1. row ***************************
id: 7280000849080746110
host: logstash-db848f65f-lnlf9
message: 2023-04-12 02:03:21 status unpacked libc-bin:amd64 2.31-0ubuntu9
path: /var/log/dpkg.log
@timestamp: 2023-06-16T09:23:57.405Z
@version: 1

Integration with Filebeat

NOTE: The integration with Filebeat requires Manticore Buddy. If it doesn't work, make sure Buddy is installed.

Filebeat is a lightweight shipper for forwarding and centralizing log data. Once installed as an agent, it monitors the log files or locations you specify, collects log events, and forwards them for indexing, usually to Elasticsearch or Logstash.

Now, Manticore also supports the use of Filebeat as processing pipelines. This allows the collected and transformed data to be sent to Manticore just like to Elasticsearch. Currently, all the versions >= 7.10 are supported.

Filebeat configuration

Below is a Filebeat config to work with our example dpkg log:

filebeat.inputs:
- type: filestream
  id: example
  paths:
    - /var/log/dpkg.log

output.elasticsearch:
  hosts: ["http://localhost:9308"]
  index:  "dpkg_log"
  allow_older_versions: true

setup.ilm:
  enabled: false

setup.template:
  name: "dpkg_log"
  pattern: "dpkg_log"

Configuration for Filebeat versions >= 8.11

Note that Filebeat versions higher than 8.10 have the output compression feature enabled by default. That is why the compression_level: 0 option must be added to the configuration file to provide compatibility with Manticore:

filebeat.inputs:
- type: filestream
  id: example
  paths:
    - /var/log/dpkg.log

output.elasticsearch:
  hosts: ["http://localhost:9308"]
  index:  "dpkg_log"
  allow_older_versions: true
  compression_level: 0

setup.ilm:
  enabled: false

setup.template:
  name: "dpkg_log"
  pattern: "dpkg_log"

Filebeat results

Once you run Filebeat with this configuration, log data will be sent to Manticore and properly indexed. Here is the resulting schema of the table created by Manticore and an example of the inserted document:

mysql> DESCRIBE dpkg_log;
+------------------+--------+--------------------+
| Field            | Type   | Properties         |
+------------------+--------+--------------------+
| id               | bigint |                    |
| @timestamp       | text   | indexed stored     |
| message          | text   | indexed stored     |
| log              | json   |                    |
| input            | json   |                    |
| ecs              | json   |                    |
| host             | json   |                    |
| agent            | json   |                    |
+------------------+--------+--------------------+
mysql> SELECT * FROM dpkg_log LIMIT 1\G
*************************** 1. row ***************************
id: 7280000849080753116
@timestamp: 2023-06-16T09:27:38.792Z
message: 2023-04-12 02:06:08 status half-installed libhogweed5:amd64 3.5.1+really3.5.1-2
input: {"type":"filestream"}
ecs: {"version":"1.6.0"}
host: {"name":"logstash-db848f65f-lnlf9"}
agent: {"ephemeral_id":"587c2ebc-e7e2-4e27-b772-19c611115996","id":"2e3d985b-3610-4b8b-aa3b-2e45804edd2c","name":"logstash-db848f65f-lnlf9","type":"filebeat","version":"7.10.0","hostname":"logstash-db848f65f-lnlf9"}
log: {"offset":80,"file":{"path":"/var/log/dpkg.log"}}

Syncing with Kafka

NOTE: this functionality requires Manticore Buddy. If it doesn't work, make sure Buddy is installed.

Manticore Search can seamlessly consume messages from a Kafka broker, allowing for real-time data indexing and search.

To get started, you need to:

  1. Define the source: Specify the Kafka topic from which Manticore Search will read messages. This setup includes details like the broker’s host, port, and topic name.
  2. Set up the destination table: Choose a Manticore real-time table to store the incoming Kafka data.
  3. Create a materialized view: Set up a materialized view (mv) to handle data transformation and mapping from Kafka to the destination table in Manticore Search. Here, you’ll define field mappings, data transformations, and any filters or conditions for the incoming data stream.

Source

The source configuration allows you to define the broker, topic list, consumer group, and the message structure.

Schema

Define the schema using Manticore field types like int, float, text, json, etc.

CREATE SOURCE <source name> [(column type, ...)] [source_options]

All schema keys are case-insensitive, so Products, products, and PrOdUcTs are treated the same. They are all converted to lowercase.

‹›
  • SQL
SQL
📋
CREATE SOURCE kafka
(id bigint, term text, abbrev text, GlossDef json)
type='kafka'
broker_list='kafka:9092'
topic_list='my-data'
consumer_group='manticore'
num_consumers='2'
batch=50
‹›
Response
Query OK, 2 rows affected (0.02 sec)

Options

Option Accepted Values Description
type kafka Sets the source type. Currently, only kafka is supported
broker_list host:port [, ...] Specifies Kafka broker URLs
topic_list string [, ...] Lists Kafka topics to consume from
consumer_group string Defines the Kafka consumer group, defaulting to manticore.
num_consumers int Number of consumers to handle messages.
batch int Number of messages to process before moving on. Default is 100; processes remaining messages on timeout otherwise

Destination table

The destination table is a regular real-time table where the results of Kafka message processing are stored. This table should be defined to match the schema requirements of the incoming data and optimized for the query performance needs of your application. Read more about creating real-time tables here.

‹›
  • SQL
SQL
📋
CREATE TABLE destination_kafka
(id bigint, name text, short_name text, received_at text, size multi);
‹›
Response
Query OK, 0 rows affected (0.02 sec)

Materialized view

A materialized view enables data transformation from Kafka messages. You can rename fields, apply Manticore Search functions, and perform sorting, grouping, and other data operations.

A materialized view acts as a query that moves data from the Kafka source to the destination table, letting you use Manticore Search syntax to customize these queries. Make sure that fields in the select match those in the source.

CREATE MATERIALIZED VIEW <materialized view name>
TO <destination table name> AS
SELECT [column|function [as <new name>], ...] FROM <source name>
‹›
  • SQL
SQL
📋
CREATE MATERIALIZED VIEW view_table
TO destination_kafka AS
SELECT id, term as name, abbrev as short_name,
       UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka
‹›
Response
Query OK, 2 rows affected (0.02 sec)

Data is transferred from Kafka to Manticore Search in batches, which are cleared after each run. For calculations across batches, such as AVG, use caution, as these may not work as expected due to batch-by-batch processing.

Field Mapping

Here's a mapping table based on the examples above:

Kafka Source Buffer MV Destination
id id id id id
term term term term as name name
unnecessary key - -
abbrev abbrev abbrev abbrev as short_name short_name
- - `UTC_TIMESTAMP()`` as received_at received_at
GlossDef GlossDef GlossDef GlossDef.size as size size

Listing

To view sources and materialized views in Manticore Search, use these commands:

  • SHOW SOURCES: Lists all configured sources.
  • SHOW MVS: Lists all materialized views.
  • SHOW MV view_table: Shows detailed information on a specific materialized view.
‹›
  • SQL
SQL
📋
SHOW SOURCES
‹›
Response
+-------+
| name  |
+-------+
| kafka |
+-------+
‹›
  • SQL
SQL
📋
SHOW SOURCE kafka;
‹›
Response
+--------+---------------------------------------------------------+
| Source | Create Table                                            |
+--------+---------------------------------------------------------+
| kafka  | CREATE SOURCE kafka                                     |
|        | (id bigint, term text, abbrev text, GlossDef json)      |
|        | type='kafka'                                            |
|        | broker_list='kafka:9092'                                |
|        | topic_list='my-data'                                    |
|        | consumer_group='manticore'                              |
|        | num_consumers='2'                                       |
|        | batch=50                                                |
+--------+---------------------------------------------------------+
‹›
  • SQL
SQL
📋
SHOW MVS
‹›
Response
+------------+
| name       |
+------------+
| view_table |
+------------+
‹›
  • SQL
SQL
📋
SHOW MV view_table
‹›
Response
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| View       | Create Table                                                                                           | suspended |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| view_table | CREATE MATERIALIZED VIEW view_table TO destination_kafka AS                                            | 0         |
|            | SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size   |           |
|            | FROM kafka                                                                                             |           |
+------------+--------------------------------------------------------------------------------------------------------+-----------+

Altering materialized views

You can suspend data consumption by altering materialized views.

If you remove the source without deleting the MV, it automatically suspends. After recreating the source, unsuspend the MV manually using the ALTER command.

Currently, only materialized views can be altered. To change source parameters, drop and recreate the source.

‹›
  • SQL
SQL
📋
ALTER MATERIALIZED VIEW view_table suspended=1
‹›
Response
Query OK (0.02 sec)

Troubleshooting

Duplicate entries

Kafka offsets commit after each batch or when processing times out. If the process stops unexpectedly during a materialized view query, you may see duplicate entries. To avoid this, include an id field in your schema, allowing Manticore Search to prevent duplicates in the table.

How it works internally

  • Worker initialization: After configuring a source and materialized view, Manticore Search sets up a dedicated worker to handle data ingestion from Kafka.
  • Message mapping: Messages are mapped according to the source configuration schema, transforming them into a structured format.
  • Batching: Messages are grouped into batches for efficient processing. Batch size can be adjusted to suit your performance and latency needs.
  • Buffering: Mapped data batches are stored in a buffer table for efficient bulk operations.
  • Materialized view processing: The view logic is applied to data in the buffer table, performing any transformations or filtering.
  • Data transfer: Processed data is then transferred to the destination real-time table.
  • Cleanup: The buffer table is cleared after each batch, ensuring it’s ready for the next set of data.