使用 Manticore,写事务(例如 INSERT、REPLACE、DELETE、TRUNCATE、UPDATE、COMMIT)可以在当前节点完全应用事务之前复制到其他集群节点。目前,复制支持 Linux 和 macOS 下的 percolate、rt 和 distributed 表。
Manticore 的 原生 Windows 二进制文件 不支持复制。我们推荐通过 WSL(Windows 子系统 Linux)安装 Manticore。
在 macOS 上,复制支持有限,仅建议用于开发目的。
Manticore 的复制由 Galera 库 提供支持,具有以下卓越功能:
- 真正的多主:随时可读取及写入任何节点。
- 几乎同步复制,无从属延迟及节点崩溃后无数据丢失。
- 热备份:切换时无停机(因为不存在故障转移)。
- 紧密耦合:所有节点保持相同状态,不允许节点间数据分歧。
- 自动节点配置:无需手动备份数据库并恢复到新节点。
- 易于使用和部署。
- 不可靠节点的检测及自动剔除。
- 基于认证的复制。
要在 Manticore Search 中设置复制:
- 配置文件中 "searchd" 部分必须设置 data_dir 选项。纯模式不支持复制。
- 必须指定一个包含其他节点可访问 IP 地址的 listen 指令,或一个有可访问 IP 的 node_address。
- 可以选择在每个集群节点上为 server_id 设置唯一值。如果未设置该值,节点将尝试使用 MAC 地址或随机数生成
server_id。
如果没有设置 replication 类型的 listen 指令,Manticore 将在默认协议监听端口之后的 200 端口范围内为每个创建的集群使用前两个空闲端口。若要手动设置复制端口,必须定义 listen 指令(类型为 replication)的端口范围,且同一服务器不同节点的地址/端口范围不能交叉。通常,端口范围应为每个集群指定至少两个端口。当定义带端口范围的复制监听器(例如 listen = 192.168.0.1:9320-9328:replication)时,Manticore 不会立即在这些端口监听。它仅在开始使用复制时,从指定范围内随机选取空闲端口。
复制集群是一组节点,其中写事务被复制。复制是按表设置的,即一张表只能属于一个集群。集群中表的数量没有限制。属于集群的任何 percolate 或实时表上的所有事务如 INSERT、REPLACE、DELETE、TRUNCATE 都会复制到该集群的所有其他节点。分布式 表也可以参与复制过程。复制是多主的,因此对任何节点或多个节点同时写入均可正常工作。
创建集群通常使用命令 create cluster,命令格式是 CREATE CLUSTER <cluster name>,加入集群可以使用 join cluster,命令格式是 JOIN CLUSTER <cluster name> at 'host:port'。但在某些罕见情况下,可能需要微调 CREATE/JOIN CLUSTER 的行为。可用选项包括:
该选项指定集群的名称。应在所有集群中唯一。
注意:
JOIN命令允许的最大主机名长度为 253 字符,超出此限制将导致 searchd 报错。
该选项指定用于写集缓存复制和从其他节点接收表的数据目录。此路径应在所有集群中唯一,且以相对于 data_dir 目录的相对路径给出。默认值为 data_dir 的值。
nodes 选项是集群所有节点的地址和端口对列表,以逗号分隔。该列表应通过节点的 API 接口获取,可以包含当前节点地址。用于加入集群以及重启后重新加入。
options 选项允许直接传递额外参数给 Galera 复制插件,详情请参见 Galera 文档参数。
在使用复制集群时,所有修改集群表内容的写语句,如 INSERT、REPLACE、DELETE、TRUNCATE、UPDATE,必须使用 cluster_name:table_name 表达式替代表名。这样可以确保变更传播到集群中的所有副本。如果不使用正确表达式,将触发错误。
在JSON接口中,所有写入集群表的语句都必须设置cluster属性以及table名称。如果没有设置cluster属性,将会导致错误。
集群中表的Auto ID 只要server_id正确配置,就是有效的。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
INSERT INTO posts:weekly_index VALUES ( 'iphone case' )
TRUNCATE RTINDEX click_query:weekly_index
UPDATE INTO posts:rt_tags SET tags=(101, 302, 304) WHERE MATCH ('use') AND id IN (1,101,201)
DELETE FROM clicks:rt WHERE MATCH ('dumy') AND gid>206POST /insert -d '
{
"cluster":"posts",
"table":"weekly_index",
"doc":
{
"title" : "iphone case",
"price" : 19.85
}
}'
POST /delete -d '
{
"cluster":"posts",
"table": "weekly_index",
"id":1
}'$index->addDocuments([
1, ['title' => 'iphone case', 'price' => 19.85]
]);
$index->deleteDocument(1);indexApi.insert({"cluster":"posts","table":"weekly_index","doc":{"title":"iphone case","price":19.85}})
indexApi.delete({"cluster":"posts","table":"weekly_index","id":1})await indexApi.insert({"cluster":"posts","table":"weekly_index","doc":{"title":"iphone case","price":19.85}})
await indexApi.delete({"cluster":"posts","table":"weekly_index","id":1})res = await indexApi.insert({"cluster":"posts","table":"weekly_index","doc":{"title":"iphone case","price":19.85}});
res = await indexApi.delete({"cluster":"posts","table":"weekly_index","id":1});InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","Crossbody Bag with Tassel");
put("price",19.85);
}};
newdoc.index("weekly_index").cluster("posts").id(1L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
DeleteDocumentRequest deleteRequest = new DeleteDocumentRequest();
deleteRequest.index("weekly_index").cluster("posts").setId(1L);
indexApi.delete(deleteRequest);Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "Crossbody Bag with Tassel");
doc.Add("price", 19.85);
InsertDocumentRequest newdoc = new InsertDocumentRequest(table: "weekly_index", cluster:posts, id: 1, doc: doc);
var sqlresult = indexApi.Insert(newdoc);
DeleteDocumentRequest deleteDocumentRequest = new DeleteDocumentRequest(table: "weekly_index", cluster: "posts", id: 1);
indexApi.Delete(deleteDocumentRequest);let mut doc = HashMap::new();
doc.insert("title".to_string(), serde_json::json!("Crossbody Bag with Tassel"));
doc.insert("price".to_string(), serde_json::json!(19.85));
let insert_req = InsertDocumentRequest {
table: serde_json::json!("weekly_index"),
doc: serde_json::json!(doc),
cluster: serde_json::json!("posts"),
id: serde_json::json!(1),
};
let insert_res = index_api.insert(insert_req).await;
let delete_req = DeleteDocumentRequest {
table: serde_json::json!("weekly_index"),
cluster: serde_json::json!("posts"),
id: serde_json::json!(1),
};
index_api.delete(delete_req).await;读取语句如SELECT、CALL PQ、DESCRIBE可以使用不带集群名称的常规表名,或者使用cluster_name:table_name格式。如果使用后者,cluster_name部分会被忽略。
如果使用HTTP端点json/search,可以指定cluster属性,也可以省略。
- SQL
- JSON
SELECT * FROM weekly_index
CALL PQ('posts:weekly_index', 'document is here')POST /search -d '
{
"cluster":"posts",
"table":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
POST /search -d '
{
"table":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'可以使用SET语句调整复制插件选项。
可以在Galera 文档参数 中找到可用选项的列表。
- SQL
- JSON
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1POST /cli -d "
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
"复制节点可能会彼此分离,导致所有节点都被标记为non-primary状态。这可能是由于节点之间的网络分裂、集群崩溃或复制插件在确定primary component时发生异常。在这种情况下,需要选择一个节点并将其提升为primary component角色。
要确定需要提升的节点,应该比较所有节点上的last_committed集群状态变量值。如果所有服务器当前都在运行,无需重启集群。相反,可以使用SET语句将具有最高last_committed值的节点提升为primary component(如示例所示)。
其他节点将重新连接到主节点并根据该节点重新同步其数据。
- SQL
- JSON
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"- ini
searchd {
listen = 9312
listen = 192.168.1.101:9360-9370:replication
data_dir = /var/lib/manticore/
...
}要复制表,必须在具有要复制的本地表的服务器上创建一个集群。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
CREATE CLUSTER postsPOST /cli -d "
CREATE CLUSTER posts
"$params = [
'cluster' => 'posts'
]
];
$response = $client->cluster()->create($params);utilsApi.sql('CREATE CLUSTER posts')await utilsApi.sql('CREATE CLUSTER posts')res = await utilsApi.sql('CREATE CLUSTER posts');utilsApi.sql("CREATE CLUSTER posts");utilsApi.Sql("CREATE CLUSTER posts");utils_api.sql("CREATE CLUSTER posts", Some(true)).await;将这些本地表添加到集群中
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
ALTER CLUSTER posts ADD pq_title
ALTER CLUSTER posts ADD pq_clicksPOST /cli -d "
ALTER CLUSTER posts ADD pq_title
"
POST /cli -d "
ALTER CLUSTER posts ADD pq_clicks
"$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'table' => 'pq_title'
]
];
$response = $client->cluster()->alter($params);
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'table' => 'pq_clicks'
]
];
$response = $client->cluster()->alter($params);utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')await utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_title');
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks');utilsApi.sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.sql("ALTER CLUSTER posts ADD pq_clicks");utilsApi.Sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.Sql("ALTER CLUSTER posts ADD pq_clicks");utils_api.sql("ALTER CLUSTER posts ADD pq_title", Some(true)).await;
utils_api.sql("ALTER CLUSTER posts ADD pq_clicks", Some(true)).await;所有希望接收集群表副本的其他节点应按照以下方式加入集群:
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
JOIN CLUSTER posts AT '192.168.1.101:9312'POST /cli -d "
JOIN CLUSTER posts AT '192.168.1.101:9312'
"$params = [
'cluster' => 'posts',
'body' => [
'192.168.1.101:9312'
]
];
$response = $client->cluster->join($params);utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')res = await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'');utilsApi.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");utilsApi.Sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");utils_api.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'", Some(true)).await;运行查询时,将表名前缀为集群名称posts:或在HTTP请求对象中使用cluster属性。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- Javascript
- Java
- C#
- Rust
INSERT INTO posts:pq_title VALUES ( 3, 'test me' )POST /insert -d '
{
"cluster":"posts",
"table":"pq_title",
"id": 3
"doc":
{
"title" : "test me"
}
}'$index->addDocuments([
3, ['title' => 'test me']
]);indexApi.insert({"cluster":"posts","table":"pq_title","id":3"doc":{"title":"test me"}})await indexApi.insert({"cluster":"posts","table":"pq_title","id":3"doc":{"title":"test me"}})res = await indexApi.insert({"cluster":"posts","table":"pq_title","id":3"doc":{"title":"test me"}});InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","test me");
}};
newdoc.index("pq_title").cluster("posts").id(3L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "test me");
InsertDocumentRequest newdoc = new InsertDocumentRequest(index: "pq_title", cluster: "posts", id: 3, doc: doc);
var sqlresult = indexApi.Insert(newdoc);let mut doc = HashMap::new();
doc.insert("title".to_string(), serde_json::json!("test me"));
let insert_req = InsertDocumentRequest {
table: serde_json::json!("pq_title"),
doc: serde_json::json!(doc),
cluster: serde_json::json!("posts"),
id: serde_json::json!(3),
};
let insert_res = index_api.insert(insert_req).await;集群中所有修改表的查询现在都会复制到集群中的所有节点。
要创建一个复制集群,您至少必须为其设置名称。
如果您正在创建单个集群或第一个集群,则可以省略path选项。在这种情况下,将使用data_dir选项作为集群路径。但是,对于所有后续的集群,您必须指定path,并且该路径必须可用。还可以设置nodes选项以列出集群中的所有节点。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
CREATE CLUSTER posts
CREATE CLUSTER click_query '/var/data/click_query/' as path
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodesPOST /cli -d "
CREATE CLUSTER posts
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
"$params = [
'cluster' => 'posts',
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/'
]
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/',
'nodes' => 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312'
]
]
];
$response = $client->cluster()->create($params);utilsApi.sql('CREATE CLUSTER posts')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes')await utilsApi.sql('CREATE CLUSTER posts')
await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path')
await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes')res = await utilsApi.sql('CREATE CLUSTER posts');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes');utilsApi.sql("CREATE CLUSTER posts");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");utilsApi.Sql("CREATE CLUSTER posts");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");utils_api.sql("CREATE CLUSTER posts", Some(true)).await;
utils_api.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path", Some(true)).await;
utils_api.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes", Some(true)).await;要加入一个现有集群,您必须至少指定:
- 集群的名称
- 您要加入的集群中另一个节点的
host:port
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
JOIN CLUSTER posts AT '10.12.1.35:9312'POST /cli -d "
JOIN CLUSTER posts AT '10.12.1.35:9312'
"$params = [
'cluster' => 'posts',
'body' => [
'10.12.1.35:9312'
]
];
$response = $client->cluster->join($params);utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'')await utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'')res = await utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'');utilsApi.sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");utilsApi.Sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");utils_api.sql("JOIN CLUSTER posts AT '10.12.1.35:9312'", Some(true)).await;{u'error': u'', u'total': 0, u'warning': u''}{u'error': u'', u'total': 0, u'warning': u''}{"total":0,"error":"","warning":""}在大多数情况下,当只有一个复制集群时,上述信息就足够了。但是,如果您要创建多个复制集群,则还必须设置路径并确保目录可用。
- SQL
JOIN CLUSTER c2 at '127.0.0.1:10201' 'c2' as path节点通过从指定节点获取数据来加入集群,如果成功,它会更新所有其他集群节点的节点列表,就像通过ALTER CLUSTER ... UPDATE nodes手动完成一样。此列表用于在重启时将节点重新加入集群。
节点列表有两种:
1.cluster_<name>_nodes_set:用于在重启时将节点重新加入集群。它在所有节点上更新,方式与ALTER CLUSTER ... UPDATE nodes相同。JOIN CLUSTER命令会自动执行此更新。集群状态将此列表显示为cluster_<name>_nodes_set。
2. cluster_<name>_nodes_view:此列表包含所有用于复制的活动节点,无需手动管理。ALTER CLUSTER ... UPDATE nodes实际上将此节点列表复制到用于重启时重新加入的节点列表中。集群状态将此列表显示为cluster_<name>_nodes_view。
当节点位于不同的网段或数据中心时,可以显式设置nodes选项。这可以最小化节点间的流量,并利用网关节点进行数据中心间的互通。以下代码使用nodes选项加入现有集群。
注意: 使用此语法时,集群的
cluster_<name>_nodes_set列表不会自动更新。要更新它,请使用ALTER CLUSTER ... UPDATE nodes。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodesPOST /cli -d "
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
"$params = [
'cluster' => 'posts',
'body' => [
'nodes' => 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312'
]
];
$response = $client->cluster->join($params);utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes')await utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes')res = await utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes');utilsApi.sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");utilsApi.Sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");utils_api.sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes", Some(true)).await;{u'error': u'', u'total': 0, u'warning': u''}{u'error': u'', u'total': 0, u'warning': u''}{"total":0,"error":"","warning":""}JOIN CLUSTER命令是同步工作的,一旦节点从集群中的其他节点接收到所有数据并与它们同步,命令就会完成。
JOIN CLUSTER操作可能会失败,并显示指示重复server_id的错误消息。当加入节点的server_id与集群中现有节点的server_id相同时,会发生这种情况。要解决此问题,请确保复制集群中的每个节点都有唯一的server_id。您可以在尝试加入集群之前,在配置文件的"searchd"部分将默认的server_id更改为唯一值。
DELETE CLUSTER 语句会删除指定的集群及其 名称。一旦删除集群,它将从所有节点中移除,但其表保持完整并成为活动的本地非复制表。
- SQL
- JSON
- PHP
- Python
- Python-asyncio
- javascript
- Java
- C#
- Rust
DELETE CLUSTER click_queryPOST /cli -d "DELETE CLUSTER click_query"$params = [
'cluster' => 'click_query',
'body' => []
];
$response = $client->cluster()->delete($params);utilsApi.sql('DELETE CLUSTER click_query')await utilsApi.sql('DELETE CLUSTER click_query')res = await utilsApi.sql('DELETE CLUSTER click_query');utilsApi.sql("DELETE CLUSTER click_query");utilsApi.Sql("DELETE CLUSTER click_query");utils_api.Sql("DELETE CLUSTER click_query", Some(true)).await;{u'error': u'', u'total': 0, u'warning': u''}{u'error': u'', u'total': 0, u'warning': u''}{"total":0,"error":"","warning":""}