索引的迁移
API介绍
ES提供了_reindex
API用来进行索引的迁移。其最简单的用法如下:
POST _reindex
{
"source": {"index": "index-1"},
"dest": {"index": "index-2"}
}
可选参数
-
source.index
用来指定源索引,可以是一个或者多个,如果是多个索引,当有相同的id存在时,其行为受
dest.version_type
和dest.op_type
控制。 -
dest.version_type
可选值为internal
或者external
。默认值为internal
,在这种情况下,ES会将源索引中的文档直接导入目标索引,而不管目标索引中是否存在相同id的文档。如果上诉请求改为如下:
POST _reindex
{
"source": {
"index": "index-1"
},
"dest": {
"index": "index-2",
"version_type": "external"
}
}
如果把目标索引的version_type
设成external
,会在目标索引中保留源索引中文档的版本号,需要保证源索引中相同id的文档拥有更高的版本号。
在保留之前2个索引的情况下进行导入设置,会得到下面错误:
"failures": [
{
"index": "index-2",
"type": "_doc",
"id": "1",
"cause": {
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, current version [2] is higher or equal to the one provided [1]",
"index_uuid": "ktaIPtpbS6SM9OYDGZKifQ",
"shard": "0",
"index": "tue-2"
},
"status": 409
}
]
-
dest.op_type
该参数的可选值为
create
,其效果是只会创建目标索引中不存在的文档。如果同时指定,
version_type
为external
,并且将op_type
指定为create
,其执行的效果是,版本高的更新成功了,同时,版本相同或者更低的则返回错误:"type": "version_conflict_engine_exception", "reason": "[1]: version conflict, current version [4] is higher or equal to the one provided [4]",
可以看出,其作用的是
version_type
,我尝试调整两个设置的顺序,得到的同样的结果。 -
conflicts
如果在请求中指定,
conflicts
:"proceed"
,返回的结果中将只会告知创建的个数,更新的个数,失败的个数而不会包含具体的原因。官方文档原文声称,如果设置此参数,_reindex过程会在遇到版本冲突时继续执行,并返回冲突的个数。而实际上,经过测试,如果不设置此参数,处理仍然会继续,目前观察到的区别,只有返回结果中是否包含冲突的原因
failure
字段。When "conflicts": "proceed" is set in the request body, the _reindex process will continue on version conflicts and return a count of version conflicts encountered.
failures (array) Array of failures if there were any unrecoverable errors during the process. If this is non-empty then the request aborted because of those failures. Reindex is implemented using batches and any failure causes the entire process to **abort** but all failures in the current batch are collected into the array. You can use the conflicts option to prevent reindex from aborting on version conflicts.
官网上对失败的解释是,如果遇到失败的情况,请求会退出。但是实际上发现,除了冲突的条目以外,其他条目都是成功。
PS: 经过测试,我发现其实自己是没有注意到上面描述中的batches。默认reindex过程中的bulk是100,即每次写100条,这100条中如果有一条错误的话,其余99条任然可以写入。但是下一个100条就不会继续执行了。由于我的测试数据比较少,一个bulk就能把所有数据写完,于是就产生了之前的疑惑。如果我把source
中的size字段设成1,那么执行过程中发生了一次冲突后,后续的流程就结束了。
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "index-1"
},
"dest": {
"index": "index-2",
"version_type": "external"
}
}
{
"took" : 22,
"timed_out" : false,
"total" : 6,
"updated" : 1,
"created" : 0,
"deleted" : 0,
"batches" : 1,
"version_conflicts" : 5,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0,
"failures" : [ ]
}
-
size
用来控制导入文档的总数。
-
source.size
该参数用来指定导入时,每批的文档数,默认是100。
POST _reindex { "conflicts": "proceed", "source": { "index": "index-1", "size": 2 }, "dest": { "index": "index-2", "version_type": "external" } }
在index-1中,一共有6个文档,导入的结果:
{ "took" : 26, "timed_out" : false, "total" : 6, "updated" : 0, "created" : 0, "deleted" : 0, "batches" : 3, "version_conflicts" : 6, "noops" : 0, "retries" : { "bulk" : 0, "search" : 0 }, "throttled_millis" : 0, "requests_per_second" : -1.0, "throttled_until_millis" : 0, "failures" : [ ] }
可见,导入的批次变成了3次。
-
source.query
使用该参数可以限定导入的条件:
POST _reindex { "conflicts": "proceed", "source": { "index": "index-1", "size": 2, "query": { "term": { "name": { "value": "sophie" } } } }, "dest": { "index": "index-2", "version_type": "external" } }
我设置了导入条件,只有
name
是sophie的可以被导入,而结果告诉我们只有一条记录被导入到新的索引中。{ "took" : 247, "timed_out" : false, "total" : 1, "updated" : 0, "created" : 1, "deleted" : 0, "batches" : 1, "version_conflicts" : 0, "noops" : 0, "retries" : { "bulk" : 0, "search" : 0 }, "throttled_millis" : 0, "requests_per_second" : -1.0, "throttled_until_millis" : 0, "failures" : [ ] }
-
source._source
这个字段能够在reindex过程中只将选中的字段提取出来导入到目标索引中去。
分片并行执行
_reindex
API可以利用ES的sliced scroll来并行执行索引迁移功能。
有两种方式可以进行:
source.slice
:
POST _reindex
{
"source": {
"index": "tue-1",
"slice": {
"id": 1,
"max": 5
}
},
"dest": {
"version_type": "external",
"index": "tue-2"
}
}
可以在请求体中指定slice_id和要切片数目。
还有一种方式是在请求行做文章
POST _reindex?slices=auto
这里slices参数可以设置成auto或者具体的切片数目。如果设成auto,ES会自动根据shard数目进行切分。
需要注意的是,在手动配置的情况下,如果shard数太过于庞大,比如说一个索引有500个shard,选择过大的切片数,同样会对性能有损耗,而如果选择的slices超过shard的数据,同样也没有意义。
从远程索引导入
ES支持从一个远程的索引导入数据,需要对source
中的remote
对象进行配置。
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"username": "user",
"password": "pass"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
其中host是必须,username和password是可选的。
在安全方面要注意的是,如果使用了基础的auth方案,需要使用https加密ES节点本身的对外流量,否则用户名和密码会以明文的形式在网络中传输。
远程导入方案不支持切片(slice)处理。
在迁移过程中使用script
最近在一个群里,有群友问:"存在几千万个电话号码,要按照最后8位进行聚类存到不同的索引。"当时,我提出了,使用_reindex+painless 脚本来实现。其实我当时没有把握,只是觉得painless可以修改文档的metadata,而且语法和java比较类似,应该可以做一些比较复杂的控制。后来晚上决心自己尝试一下,于是自己在kibana上写了一个简单的demo,发现确实是可以的。这里记录一下过程。
demo设计非常简单,源索引只有一个数值型成员"num",reindex中使用painless将其中的数据按照num大小分到不同的目标索引中。
数据准备:
POST _bulk
{ "create" : { "_index" : "src_index", "_id": 1} }
{ "num" : 1 }
{ "create" : { "_index" : "src_index", "_id": 2} }
{ "num" : 2 }
{ "create" : { "_index" : "src_index", "_id": 3} }
{ "num" : 3 }
{ "create" : { "_index" : "src_index", "_id": 4} }
{ "num" : 4 }
{ "create" : { "_index" : "src_index", "_id": 5} }
{ "num" : 5 }
{ "create" : { "_index" : "src_index", "_id": 6} }
{ "num" : 6 }
{ "create" : { "_index" : "src_index", "_id": 7} }
{ "num" : 7 }
{ "create" : { "_index" : "src_index", "_id": 8} }
{ "num" : 8 }
reindex:
POST _reindex
{
"source": {
"index": "src_index"
}
,
"dest": {
"index": "dst_index"
},
"script": {
"lang": "painless",
"source": """
if (ctx._source.num < 5)
{
ctx._index = "dst_index_5";
}
else
{
ctx._index = "dst_index_10";
}
"""
}
}
执行后发现,数据确实被分配到dst_index_5和dst_index_10两个索引中去了。
总结
本文以ES官方文档为基础(https://www.elastic.co/guide/en/elasticsearch/reference/7.2/docs-reindex.html),总结了一下基本用法。可能有一些疏漏。其中在reindex过程中使用painless script的内容没有涉及。
网友评论