Load balancing is turned on by default for any distributed index using mirroring. By default queries are distributed randomly among the mirrors. To change this behaviour you can use ha_strategy.
ha_strategy = {random|nodeads|noerrors|roundrobin}
Agent mirror selection strategy for load balancing. Optional, default is random.
The strategy used for mirror selection, or in other words, choosing a specific agent mirror in a distributed index. Essentially, this directive controls how exactly master does the load balancing between the configured mirror agent nodes. The following strategies are implemented:
The default balancing mode. Simple linear random distribution among the mirrors. That is, equal selection probability are assigned to every mirror. Kind of similar to round-robin (RR), but unlike RR, does not impose a strict selection order.
- Example
ha_strategy = random
The default simple random strategy does not take mirror status, error rate, and, most importantly, actual response latencies into account. So to accommodate for heterogeneous clusters and/or temporary spikes in agent node load, we have a group of balancing strategies that dynamically adjusts the probabilities based on the actual query latencies observed by the master.
The adaptive strategies based on latency-weighted probabilities basically work as follows:
- latency stats are accumulated, in blocks of ha_period_karma seconds;
- once per karma period, latency-weighted probabilities get recomputed;
- once per request (including ping requests), "dead or alive" flag is adjusted.
Currently, we begin with equal probabilities (or percentages, for brevity), and on every step, scale them by the inverse of the latencies observed during the last "karma" period, and then renormalize them. For example, if during the first 60 seconds after the master startup 4 mirrors had latencies of 10, 5, 30, and 3 msec/query respectively, the first adjustment step would go as follow:
- initial percentages: 0.25, 0.25, 0.25, 0.2%;
- observed latencies: 10 ms, 5 ms, 30 ms, 3 ms;
- inverse latencies: 0.1, 0.2, 0.0333, 0.333;
- scaled percentages: 0.025, 0.05, 0.008333, 0.0833;
- renormalized percentages: 0.15, 0.30, 0.05, 0.50.
Meaning that the 1st mirror would have a 15% chance of being chosen during the next karma period, the 2nd one a 30% chance, the 3rd one (slowest at 30 ms) only a 5% chance, and the 4th and the fastest one (at 3 ms) a 50% chance. Then, after that period, the second adjustment step would update those chances again, and so on.
The rationale here is, once the observed latencies stabilize, the latency weighted probabilities stabilize as well. So all these adjustment iterations are supposed to converge at a point where the average latencies are (roughly) equal over all mirrors.
Latency-weighted probabilities, but dead mirrors are excluded from the selection. "Dead" mirror is defined as a mirror that resulted in multiple hard errors (eg. network failure, or no answer, etc) in a row.
- Example
ha_strategy = nodeads
Latency-weighted probabilities, but mirrors with worse errors/success ratio are excluded from the selection.
- Example
ha_strategy = noerrors
Simple round-robin selection, that is, selecting the 1st mirror in the list, then the 2nd one, then the 3rd one, etc, and then repeating the process once the last mirror in the list is reached. Unlike with the randomized strategies, RR imposes a strict querying order (1, 2, 3, ..., N-1, N, 1, 2, 3, ... and so on) and guarantees that no two subsequent queries will be sent to the same mirror.
- Example
ha_strategy = roundrobin
ha_period_karma = 2m
ha_period_karma
defines agent mirror statistics window size, in seconds (or time suffixed). Optional, default is 60.
For a distributed index with agent mirrors in it, server tracks several different per-mirror counters. These counters are then used for failover and balancing. (Server picks the best mirror to use based on the counters.) Counters are accumulated in blocks of ha_period_karma
seconds.
After beginning a new block, master may still use the accumulated values from the previous one, until the new one is half full. Thus, any previous history stops affecting the mirror choice after 1.5 times ha_period_karma seconds at most.
Despite that at most 2 blocks are used for mirror selection, up to 15 last blocks are actually stored, for instrumentation purposes. They can be inspected using SHOW AGENT STATUS
statement.
ha_ping_interval = 3s
ha_ping_interval
defines interval between agent mirror pings, in milliseconds (or time suffixed). Optional, default is 1000.
For a distributed index with agent mirrors in it, server sends all mirrors a ping command during the idle periods. This is to track the current agent status (alive or dead, network roundtrip, etc). The interval between such pings is defined by this directive.
To disable pings, set ha_ping_interval to 0.
Manticore can replicate a write transaction (INSERT
, REPLACE
, DELETE
, TRUNCATE
, UPDATE
, etc) in an index to other nodes in the cluster. Currently percolate and rt indexes are supported. Only Linux packages and builds support replication, Windows and MacOS packages do not support replication.
Manticore's replication is based on Galera library and features the following:
- true multi-master - read and write to any node at any time
- synchronous replication - no slave lag, no data is lost after a node crash
- hot standby - no downtime during failover (since there is no failover)
- tightly coupled - all the nodes hold the same state. No diverged data between nodes allowed
- automatic node provisioning - no need to manually back up the database and restore it on a new node
- easy to use and deploy
- detection and automatic eviction of unreliable nodes
- certification based replication
To use replication in Manticore Search:
- data_dir option should be set in section "searchd" of the configuration file. Replication is not supported in the plain mode
- there should be either:
- a listen directive specified (without specifying a protocol) containing an IP address accessible by other nodes
- or node_address with an accessible IP address
- optionally set unique values for server_id on each cluster node. If no value set, the node will try to use the MAC address (or a random number if that fails) to generate the server_id.
If there is no replication listen directive set Manticore will use first couple of free ports in a range of 200 ports after the port at which the daemon is listening (default protocol) for each cluster created. For manual declaration of replication ports the listen directive port range should be defined and these "address - port range" pairs should be different for all Manticore instances on the same host. As a rule of thumb, the port range should specify no less than two ports per cluster.
Replication cluster is a set of nodes among which a write transaction gets replicated. Replication is configured on the per-index basis. One index can be assigned to only one cluster. There is no restriction on how many indexes a cluster may have. All transactions such as INSERT
, REPLACE
, DELETE
, TRUNCATE
in any percolate index belonging to a cluster are replicated to all the other nodes in the cluster. Replication is multi-master, so writes to any particular node or to multiple nodes simultaneously work equally well.
Replication cluster configuration options are:
Specifies a name for the cluster. Should be unique.
Data directory for a write-set cache replication and incoming indexes from other nodes. Should be unique among the other clusters in the node. Default is data_dir.
A list of address:port pairs for all the nodes in the cluster (comma separated). A node's API interface should be used for this option. It can contain the current node's address too. This list is used to join a node to the cluster and rejoin it after restart.
Options passed directly to Galera replication plugin as described here Galera Documentation Parameters
For SQL interface all write statements such as INSERT
, REPLACE
, DELETE
, TRUNCATE
, UPDATE
that change the content of a cluster's index should use cluster_name:index_name
expression in place of an index name to make sure the change is propagated to all replicas in the cluster. An error will be triggered otherwise.
All write statements for HTTP interface to a cluster's index should set cluster
property along with index
name. An error will be triggered otherwise.
- SQL
- HTTP
- PHP
- Python
- Javascript
- Java
INSERT INTO posts:weekly_index VALUES ( 'iphone case' )
TRUNCATE RTINDEX click_query:weekly_index
UPDATE INTO posts:rt_tags SET tags=(101, 302, 304) WHERE MATCH ('use') AND id IN (1,101,201)
DELETE FROM clicks:rt WHERE MATCH ('dumy') AND gid>206
POST /insert -d '
{
"cluster":"posts",
"index":"weekly_index",
"doc":
{
"title" : "iphone case",
"price" : 19.85
}
}'
POST /delete -d '
{
"cluster":"posts",
"index": "weekly_index",
"id":1
}'
$index->addDocuments([
1, ['title' => 'iphone case', 'price' => 19.85]
]);
$index->deleteDocument(1);
indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}})
indexApi.delete({"cluster":"posts","index":"weekly_index","id":1})
res = await indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}});
res = await indexApi.delete({"cluster":"posts","index":"weekly_index","id":1});
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","Crossbody Bag with Tassel");
put("price",19.85);
}};
newdoc.index("weekly_index").cluster("posts").id(1L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
DeleteDocumentRequest deleteRequest = new DeleteDocumentRequest();
deleteRequest.index("weekly_index").cluster("posts").setId(1L);
indexApi.delete(deleteRequest);
Read statements such as CALL PQ
, SELECT
or DESCRIBE
can use either regular index names not prepended with a cluster name or cluster_name:index_name
. cluster_name:index_name
syntax ignores the cluster name and may be used on an index that doesn't belong to the cluster.
HTTP endpoint json/search
could use either a cluster
property or not.
- SQL
- HTTP
SELECT * FROM weekly_index
CALL PQ('posts:weekly_index', 'document is here')
POST /search -d '
{
"cluster":"posts",
"index":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
POST /search -d '
{
"index":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
ID auto generation uses UUID_SHORT similar to MySQL function. It is valid cluster wide UUID when server_id properly configured.
Note:
UpdateAttributes
statement from API interface to specific index always set proper cluster at server and there is no way to know is update to index got propagated into cluster properly or node diverged and statement updated only local index.
Replication plugin options can be changed using SET
statement (see the example).
See Galera Documentation Parameters for a list of available options.
- SQL
- HTTP
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
POST /sql -d "mode=raw&query=
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
"
Sometimes replicated nodes can diverge from each other. The state of all the nodes might turn into non-primary
due to a network split between nodes, a cluster crash, or if the replication plugin hits an exception when determining the primary component
. Then it's necessary to select a node and promote it to the primary component
.
To determine which node needs to be a reference, compare the last_committed
cluster status variable value on all nodes. If all the servers are already running there's no need to start the cluster again. You just need to promote the most advanced node to the primary component
with SET
statement (see the example).
All other nodes will reconnect to the node and resync their data based on this node.
- SQL
- HTTP
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
POST /sql -d "mode=raw&query=
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"
To use replication define one listen port for SphinxAPI protocol and one listen for replication address and port range in the config. Define data_dir folder for incoming indexes.
- ini
searchd {
listen = 9312
listen = 192.168.1.101:9360-9370:replication
data_dir = /var/lib/manticore/
...
}
Create a cluster at the server that has local indexes that need to be replicated
- SQL
- HTTP
- PHP
- Python
- Javascript
- Java
CREATE CLUSTER posts
POST /sql -d "mode=raw&query=
CREATE CLUSTER posts
"
$params = [
'cluster' => 'posts'
]
];
$response = $client->cluster()->create($params);
utilsApi.sql('mode=raw&query=CREATE CLUSTER posts')
res = await utilsApi.sql('mode=raw&query=CREATE CLUSTER posts');
utilsApi.sql("mode=raw&query=CREATE CLUSTER posts");
Add these local indexes to cluster
- SQL
- HTTP
- PHP
- Python
- Javascript
- Java
ALTER CLUSTER posts ADD pq_title
ALTER CLUSTER posts ADD pq_clicks
POST /sql -d "mode=raw&query=
ALTER CLUSTER posts ADD pq_title
"
POST /sql -d "mode=raw&query=
ALTER CLUSTER posts ADD pq_clicks
"
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'index' => 'pq_title'
]
];
$response = $client->cluster()->alter($params);
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'index' => 'pq_clicks'
]
];
$response = $client->cluster()->alter($params);
utilsApi.sql('mode=raw&query=ALTER CLUSTER posts ADD pq_title')
utilsApi.sql('mode=raw&query=ALTER CLUSTER posts ADD pq_clicks')
res = await utilsApi.sql('mode=raw&query=ALTER CLUSTER posts ADD pq_title');
res = await utilsApi.sql('mode=raw&query=ALTER CLUSTER posts ADD pq_clicks');
utilsApi.sql("mode=raw&query=ALTER CLUSTER posts ADD pq_title");
utilsApi.sql("mode=raw&query=ALTER CLUSTER posts ADD pq_clicks");
All other nodes that want replica of cluster's indexes should join cluster as
- SQL
- HTTP
- PHP
- Python
- Javascript
- Java
JOIN CLUSTER posts AT '192.168.1.101:9312'
POST /sql -d "mode=raw&query=
JOIN CLUSTER posts AT '192.168.1.101:9312'
"
$params = [
'cluster' => 'posts',
'body' => [
'192.168.1.101:9312'
]
];
$response = $client->cluster->join($params);
utilsApi.sql('mode=raw&query=JOIN CLUSTER posts AT \'192.168.1.101:9312\'')
res = await utilsApi.sql('mode=raw&query=JOIN CLUSTER posts AT \'192.168.1.101:9312\'');
utilsApi.sql("mode=raw&query=JOIN CLUSTER posts AT '192.168.1.101:9312'");
When running queries for SQL prepend the index name with the cluster name posts:
or use cluster
property for HTTP request object.
- SQL
- HTTP
- PHP
- Python
- Javascript
- Java
INSERT INTO posts:pq_title VALUES ( 3, 'test me' )
POST /insert -d '
{
"cluster":"posts",
"index":"pq_title",
"id": 3
"doc":
{
"title" : "test me"
}
}'
$index->addDocuments([
3, ['title' => 'test me']
]);
indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}})
res = await indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}});
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","test me");
}};
newdoc.index("pq_title").cluster("posts").id(3L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
Now all such queries that modify indexes in the cluster are replicated to all nodes in the cluster.
To create a replication cluster you should set at least its name.
In case of a single cluster or if the cluster you are creating is the first one path option may be omitted, in this case data_dir option will be used as the cluster path. For all subsequent clusters you need to specify path and this path should be available. nodes option may be also set to enumerate all the nodes in the cluster.
- SQL
- HTTP
- PHP
- Python
- javascript
- Java
CREATE CLUSTER posts
CREATE CLUSTER click_query '/var/data/click_query/' as path
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
POST /sql -d "mode=raw&query=
CREATE CLUSTER posts
"
POST /sql -d "mode=raw&query=
CREATE CLUSTER click_query '/var/data/click_query/' as path
"
POST /sql -d "mode=raw&query=
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
"
$params = [
'cluster' => 'posts',
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/'
]
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/',
'nodes' => 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312'
]
]
];
$response = $client->cluster()->create($params);
utilsApi.sql('mode=raw&query=CREATE CLUSTER posts')
utilsApi.sql('mode=raw&query=CREATE CLUSTER click_query \'/var/data/click_query/\' as path')
utilsApi.sql('mode=raw&query=CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes')
res = await utilsApi.sql('mode=raw&query=CREATE CLUSTER posts');
res = await utilsApi.sql('mode=raw&query=CREATE CLUSTER click_query \'/var/data/click_query/\' as path');
res = await utilsApi.sql('mode=raw&query=CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes');
utilsApi.sql("mode=raw&query=CREATE CLUSTER posts");
utilsApi.sql("mode=raw&query=CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.sql("mode=raw&query=CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");
{u'error': u'', u'total': 0, u'warning': u''}
{"total":0,"error":"","warning":""}
If a cluster is created without the nodes option, the first node that gets joined to the cluster will be saved as nodes.
To join an existing cluster name and any working node should be set. In case of a single cluster path might be omitted, data_dir will be used as the cluster path. For all subsequent clusters path needs to be set and it should be available.
- SQL
- HTTP
- PHP
- Python
- javascript
- Java
JOIN CLUSTER posts AT '10.12.1.35:9312'
POST /sql -d "mode=raw&query=
JOIN CLUSTER posts AT '10.12.1.35:9312'
"
$params = [
'cluster' => 'posts',
'body' => [
'10.12.1.35:9312'
]
];
$response = $client->cluster->join($params);
utilsApi.sql('mode=raw&query=JOIN CLUSTER posts AT \'10.12.1.35:9312\'')
res = await utilsApi.sql('mode=raw&query=JOIN CLUSTER posts AT \'10.12.1.35:9312\'');
utilsApi.sql("mode=raw&query=JOIN CLUSTER posts AT '10.12.1.35:9312'");
{u'error': u'', u'total': 0, u'warning': u''}
{"total":0,"error":"","warning":""}
A node joins a cluster by getting the data from the node provided and, if successful, it updates node lists in all the other cluster nodes similar to ALTER CLUSTER ... UPDATE nodes. This list is used to rejoin nodes to the cluster on restart.
There are two lists of nodes. One is used to rejoin nodes to the cluster on restart, it is updated across all nodes same way as ALTER CLUSTER ... UPDATE nodes does. JOIN CLUSTER
does the same update automatically. Cluster status shows this list as cluster_post_nodes_set
. The second list is a list of all active nodes used for replication. This list doesn't require manual management. ALTER CLUSTER ... UPDATE nodes actually copies this list of nodes to the list of nodes used to rejoin on restart. Cluster status shows this list as cluster_post_nodes_view
.
When nodes are located at different network segments or in different datacenters nodes option may be set explicitly. That allows to minimize traffic between nodes and to use gateway nodes for datacenters intercommunication. The following command joins an existing cluster using the nodes option.
Note: that when this syntax is used,
cluster_post_nodes_set
list is not updated automatically. Use ALTER CLUSTER ... UPDATE nodes to update it.
- SQL
- HTTP
- PHP
- Python
- javascript
- Java
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
POST /sql -d "mode=raw&query=
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
"
$params = [
'cluster' => 'posts',
'body' => [
'nodes' => 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312'
]
];
$response = $client->cluster->join($params);
utilsApi.sql('mode=raw&query=JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes')
res = await utilsApi.sql('mode=raw&query=JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes');
utilsApi.sql("mode=raw&query=JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");
{u'error': u'', u'total': 0, u'warning': u''}
{"total":0,"error":"","warning":""}
JOIN CLUSTER
completes when a node receives all the necessary data to be in sync with all the other nodes in the cluster.