使用 Manticore,写事务(如 INSERT、REPLACE、DELETE、TRUNCATE、UPDATE、COMMIT)可以在当前节点完全应用事务之前复制到其他集群节点。目前,复制支持 Linux 和 macOS 上的 percolate、rt 和 distributed 表。
Manticore 的原生 Windows 二进制文件不支持复制。我们建议通过 WSL 安装 Manticore(Windows 子系统 Linux)。
在 macOS 上,复制支持有限,仅建议用于开发目的。
Manticore 的复制由 Galera 库 提供支持,具有以下显著特性:
- 真正的多主:任何时间可对任意节点进行读写。
- 几乎同步复制,无从属延迟,节点崩溃后无数据丢失。
- 热备份:故障切换期间无停机(因为没有故障切换)。
- 紧密耦合:所有节点保持相同状态,不允许节点间数据分歧。
- 自动节点配置:无需手动备份数据库并在新节点恢复。
- 易于使用和部署。
- 不可靠节点的检测和自动剔除。
- 基于认证的复制。
要在 Manticore Search 中设置复制:
- 必须在配置文件的 "searchd" 部分设置 data_dir 选项。纯模式不支持复制。
- 必须指定一个包含其他节点可访问 IP 地址的 listen 指令,或带有可访问 IP 地址的 node_address。
- 可选地,可以在每个集群节点上为 server_id 设置唯一值。如果未设置,节点将尝试使用 MAC 地址或随机数生成
server_id。
如果未设置 replication 类型的 listen 指令,Manticore 将在默认协议监听端口之后的 200 端口范围内为每个创建的集群使用前两个空闲端口。要手动设置复制端口,必须定义 listen 指令(replication 类型)的端口范围,且同一服务器上不同节点的地址/端口范围对不得相交。一般来说,端口范围应为每个集群指定至少两个端口。当定义带端口范围的复制监听器(例如 listen = 192.168.0.1:9320-9328:replication)时,Manticore 不会立即开始监听这些端口,而是在开始使用复制时从指定范围中随机选择空闲端口。
复制集群是一组节点,写事务会在其中复制。复制按表设置,意味着一个表只能属于一个集群。集群中表的数量没有限制。属于集群的任何 percolate 或实时表上的所有事务,如 INSERT、REPLACE、DELETE、TRUNCATE,都会复制到该集群的所有其他节点。分布式 表也可以参与复制过程。复制是多主的,因此对任意节点或多个节点同时写入都能正常工作。
要创建集群,通常可以使用命令 create cluster 通过 CREATE CLUSTER <cluster name>,要加入集群,可以使用 join cluster 通过 JOIN CLUSTER <cluster name> at 'host:port'。不过,在某些罕见情况下,您可能想微调 CREATE/JOIN CLUSTER 的行为。可用选项如下:
此选项指定集群名称。它应在系统中所有集群中唯一。
注意:
JOIN命令允许的最大主机名长度为 253 个字符。超过此限制,searchd 会报错。
path 选项指定用于写集缓存复制和来自其他节点的传入表的数据目录。此值应在系统中所有集群中唯一,并应作为相对于 data_dir 目录的相对路径指定。默认设置为 data_dir 的值。
nodes 选项是集群中所有节点的地址:端口对列表,使用逗号分隔。此列表应通过节点的 API 接口获取,也可以包含当前节点的地址。它用于将节点加入集群以及重启后重新加入。
options 选项允许您直接向 Galera 复制插件传递额外选项,详见 Galera 文档参数
在使用复制集群时,所有修改集群表内容的写语句,如 INSERT、REPLACE、DELETE、TRUNCATE、UPDATE,必须使用 cluster_name:table_name 表达式替代表名。这确保更改传播到集群中的所有副本。如果未使用正确表达式,将触发错误。
在 JSON 接口中,所有写入集群表的语句必须同时设置 cluster 属性和 table 名称。未设置 cluster 属性将导致错误。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
INSERT INTO posts:weekly_index VALUES ( 'iphone case' )
TRUNCATE RTINDEX click_query:weekly_index
UPDATE INTO posts:rt_tags SET tags=(101, 302, 304) WHERE MATCH ('use') AND id IN (1,101,201)
DELETE FROM clicks:rt WHERE MATCH ('dumy') AND gid>206POST /insert -d '
{
"cluster":"posts",
"table":"weekly_index",
"doc":
{
"title" : "iphone case",
"price" : 19.85
}
}'
POST /delete -d '
{
"cluster":"posts",
"table": "weekly_index",
"id":1
}'$index->addDocuments([
1, ['title' => 'iphone case', 'price' => 19.85]
]);
$index->deleteDocument(1);indexApi.insert({"cluster":"posts","table":"weekly_index","doc":{"title":"iphone case","price":19.85}})
indexApi.delete({"cluster":"posts","table":"weekly_index","id":1})await indexApi.insert({"cluster":"posts","table":"weekly_index","doc":{"title":"iphone case","price":19.85}})
await indexApi.delete({"cluster":"posts","table":"weekly_index","id":1})res = await indexApi.insert({"cluster":"posts","table":"weekly_index","doc":{"title":"iphone case","price":19.85}});
res = await indexApi.delete({"cluster":"posts","table":"weekly_index","id":1});InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","Crossbody Bag with Tassel");
put("price",19.85);
}};
newdoc.index("weekly_index").cluster("posts").id(1L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
DeleteDocumentRequest deleteRequest = new DeleteDocumentRequest();
deleteRequest.index("weekly_index").cluster("posts").setId(1L);
indexApi.delete(deleteRequest);Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "Crossbody Bag with Tassel");
doc.Add("price", 19.85);
InsertDocumentRequest newdoc = new InsertDocumentRequest(table: "weekly_index", cluster:posts, id: 1, doc: doc);
var sqlresult = indexApi.Insert(newdoc);
DeleteDocumentRequest deleteDocumentRequest = new DeleteDocumentRequest(table: "weekly_index", cluster: "posts", id: 1);
indexApi.Delete(deleteDocumentRequest);let mut doc = HashMap::new();
doc.insert("title".to_string(), serde_json::json!("Crossbody Bag with Tassel"));
doc.insert("price".to_string(), serde_json::json!(19.85));
let insert_req = InsertDocumentRequest {
table: serde_json::json!("weekly_index"),
doc: serde_json::json!(doc),
cluster: serde_json::json!("posts"),
id: serde_json::json!(1),
};
let insert_res = index_api.insert(insert_req).await;
let delete_req = DeleteDocumentRequest {
table: serde_json::json!("weekly_index"),
cluster: serde_json::json!("posts"),
id: serde_json::json!(1),
};
index_api.delete(delete_req).await;读取语句如 SELECT、CALL PQ、DESCRIBE 可以使用未加集群名前缀的常规表名,也可以使用 cluster_name:table_name 格式。如果使用后者,cluster_name 部分会被忽略。
使用 HTTP 端点 json/search 时,可以选择指定 cluster 属性,也可以省略。
- SQL
- JSON
SELECT * FROM weekly_index
CALL PQ('posts:weekly_index', 'document is here')POST /search -d '
{
"cluster":"posts",
"table":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
POST /search -d '
{
"table":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'可以使用 SET 语句调整复制插件选项。
可用选项列表见 Galera Documentation Parameters 。
- SQL
- JSON
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1POST /cli -d "
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
"复制节点可能会出现分歧,导致所有节点都被标记为 non-primary。这可能是由于节点间网络分割、集群崩溃,或复制插件在确定 primary component 时发生异常。此时需要选择一个节点并将其提升为 primary component。
要确定需要提升的节点,应比较所有节点上的 last_committed 集群状态变量值。如果所有服务器都在运行,则无需重启集群。只需使用 SET 语句将具有最高 last_committed 值的节点提升为 primary component(如示例所示)。
其他节点随后将重新连接到主组件,并基于该节点重新同步数据。
- SQL
- JSON
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"- ini
searchd {
listen = 9312
listen = 192.168.1.101:9360-9370:replication
data_dir = /var/lib/manticore/
...
}要复制表,必须在拥有本地表的服务器上创建集群。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
CREATE CLUSTER postsPOST /cli -d "
CREATE CLUSTER posts
"$params = [
'cluster' => 'posts'
]
];
$response = $client->cluster()->create($params);utilsApi.sql('CREATE CLUSTER posts')await utilsApi.sql('CREATE CLUSTER posts')res = await utilsApi.sql('CREATE CLUSTER posts');utilsApi.sql("CREATE CLUSTER posts");utilsApi.Sql("CREATE CLUSTER posts");utils_api.sql("CREATE CLUSTER posts", Some(true)).await;将这些本地表添加到集群中
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
ALTER CLUSTER posts ADD pq_title
ALTER CLUSTER posts ADD pq_clicksPOST /cli -d "
ALTER CLUSTER posts ADD pq_title
"
POST /cli -d "
ALTER CLUSTER posts ADD pq_clicks
"$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'table' => 'pq_title'
]
];
$response = $client->cluster()->alter($params);
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'table' => 'pq_clicks'
]
];
$response = $client->cluster()->alter($params);utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')await utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_title');
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks');utilsApi.sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.sql("ALTER CLUSTER posts ADD pq_clicks");utilsApi.Sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.Sql("ALTER CLUSTER posts ADD pq_clicks");utils_api.sql("ALTER CLUSTER posts ADD pq_title", Some(true)).await;
utils_api.sql("ALTER CLUSTER posts ADD pq_clicks", Some(true)).await;所有希望接收集群表副本的其他节点应按如下方式加入集群:
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
JOIN CLUSTER posts AT '192.168.1.101:9312'POST /cli -d "
JOIN CLUSTER posts AT '192.168.1.101:9312'
"$params = [
'cluster' => 'posts',
'body' => [
'192.168.1.101:9312'
]
];
$response = $client->cluster->join($params);utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')res = await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'');utilsApi.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");utilsApi.Sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");utils_api.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'", Some(true)).await;运行查询时,在表名前加上集群名 posts:或在 HTTP 请求对象中使用 cluster 属性。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
INSERT INTO posts:pq_title VALUES ( 3, 'test me' )POST /insert -d '
{
"cluster":"posts",
"table":"pq_title",
"id": 3
"doc":
{
"title" : "test me"
}
}'$index->addDocuments([
3, ['title' => 'test me']
]);indexApi.insert({"cluster":"posts","table":"pq_title","id":3"doc":{"title":"test me"}})await indexApi.insert({"cluster":"posts","table":"pq_title","id":3"doc":{"title":"test me"}})res = await indexApi.insert({"cluster":"posts","table":"pq_title","id":3"doc":{"title":"test me"}});InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","test me");
}};
newdoc.index("pq_title").cluster("posts").id(3L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "test me");
InsertDocumentRequest newdoc = new InsertDocumentRequest(index: "pq_title", cluster: "posts", id: 3, doc: doc);
var sqlresult = indexApi.Insert(newdoc);let mut doc = HashMap::new();
doc.insert("title".to_string(), serde_json::json!("test me"));
let insert_req = InsertDocumentRequest {
table: serde_json::json!("pq_title"),
doc: serde_json::json!(doc),
cluster: serde_json::json!("posts"),
id: serde_json::json!(3),
};
let insert_res = index_api.insert(insert_req).await;All queries that modify tables in the cluster are now replicated to all nodes in the cluster.
要创建复制集群,至少必须设置其 name。
如果您正在创建单个集群或第一个集群,可以省略 path 选项。在这种情况下,将使用 data_dir 选项作为集群路径。但是,对于所有后续集群,必须指定 path,且该路径必须可用。还可以设置 nodes 选项以列出集群中的所有节点。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
CREATE CLUSTER posts
CREATE CLUSTER click_query '/var/data/click_query/' as path
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodesPOST /cli -d "
CREATE CLUSTER posts
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
"$params = [
'cluster' => 'posts',
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/'
]
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/',
'nodes' => 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312'
]
]
];
$response = $client->cluster()->create($params);utilsApi.sql('CREATE CLUSTER posts')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes')await utilsApi.sql('CREATE CLUSTER posts')
await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path')
await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes')res = await utilsApi.sql('CREATE CLUSTER posts');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes');utilsApi.sql("CREATE CLUSTER posts");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");utilsApi.Sql("CREATE CLUSTER posts");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");utils_api.sql("CREATE CLUSTER posts", Some(true)).await;
utils_api.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path", Some(true)).await;
utils_api.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes", Some(true)).await;要加入现有集群,您必须至少指定:
- 集群的 名称
- 您要加入的集群中另一个节点的
host:port
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
JOIN CLUSTER posts AT '10.12.1.35:9312'POST /cli -d "
JOIN CLUSTER posts AT '10.12.1.35:9312'
"$params = [
'cluster' => 'posts',
'body' => [
'10.12.1.35:9312'
]
];
$response = $client->cluster->join($params);utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'')await utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'')res = await utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'');utilsApi.sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");utilsApi.Sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");utils_api.sql("JOIN CLUSTER posts AT '10.12.1.35:9312'", Some(true)).await;{u'error': u'', u'total': 0, u'warning': u''}{u'error': u'', u'total': 0, u'warning': u''}{"total":0,"error":"","warning":""}在大多数情况下,当只有一个复制集群时,上述内容已足够。然而,如果您正在创建多个复制集群,则还必须设置 路径 并确保该目录可用。
- SQL
JOIN CLUSTER c2 at '127.0.0.1:10201' 'c2' as path节点通过从指定节点获取数据来加入集群,如果成功,则会以与手动通过 ALTER CLUSTER ... UPDATE nodes 操作相同的方式更新所有其他集群节点的节点列表。此列表用于在重启时重新加入节点到集群。
有两个节点列表:
1.cluster_<name>_nodes_set:用于在重启时重新加入节点到集群。它会以与 ALTER CLUSTER ... UPDATE nodes 相同的方式在所有节点间更新。JOIN CLUSTER 命令会自动执行此更新。集群状态 显示此列表为 cluster_<name>_nodes_set。
2. cluster_<name>_nodes_view:此列表包含所有用于复制的活动节点,无需手动管理。ALTER CLUSTER ... UPDATE nodes 实际上是将此节点列表复制到用于重启时重新加入的节点列表中。集群状态 显示此列表为 cluster_<name>_nodes_view。
当节点位于不同的网络段或数据中心时,可以显式设置 nodes 选项。这可以最小化节点间的流量,并利用网关节点实现数据中心间的通信。以下代码使用 nodes 选项加入现有集群。
注意: 使用此语法时,集群的
cluster_<name>_nodes_set列表不会自动更新。要更新它,请使用 ALTER CLUSTER ... UPDATE nodes。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodesPOST /cli -d "
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
"$params = [
'cluster' => 'posts',
'body' => [
'nodes' => 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312'
]
];
$response = $client->cluster->join($params);utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes')await utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes')res = await utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes');utilsApi.sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");utilsApi.Sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");utils_api.sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes", Some(true)).await;{u'error': u'', u'total': 0, u'warning': u''}{u'error': u'', u'total': 0, u'warning': u''}{"total":0,"error":"","warning":""}JOIN CLUSTER 命令是同步执行的,一旦节点从集群中的其他节点接收完所有数据并与它们同步,即完成操作。
JOIN CLUSTER 操作可能会失败,并显示重复的 server_id 错误信息。这种情况发生在加入的节点与集群中已有节点的 server_id 相同。为解决此问题,请确保复制集群中的每个节点具有唯一的 server_id。您可以在配置文件的 "searchd" 部分将默认的 server_id 更改为唯一值,然后再尝试加入集群。
DELETE CLUSTER 语句删除指定的具有其 名称 的集群。集群一旦被删除,它将从所有节点中移除,但其表保持完整并变为活动的本地非复制表。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
DELETE CLUSTER click_queryPOST /cli -d "DELETE CLUSTER click_query"$params = [
'cluster' => 'click_query',
'body' => []
];
$response = $client->cluster()->delete($params);utilsApi.sql('DELETE CLUSTER click_query')await utilsApi.sql('DELETE CLUSTER click_query')res = await utilsApi.sql('DELETE CLUSTER click_query');utilsApi.sql("DELETE CLUSTER click_query");utilsApi.Sql("DELETE CLUSTER click_query");utils_api.Sql("DELETE CLUSTER click_query", Some(true)).await;{u'error': u'', u'total': 0, u'warning': u''}{u'error': u'', u'total': 0, u'warning': u''}{"total":0,"error":"","warning":""}