Elasticsearch中的Ingest pipelines
Ingest pipelines 可以在建立索引之前对数据执行常见的转换。例如,您可以使用管道来删除字段、从文本中提取值和丰富数据。
pipeline 由一系列称为处理器的可配置任务组成。每个处理器按顺序运行,对传入的文档进行特定的更改。处理器运行后,Elasticsearch将转换后的文档添加到数据流或索引中。
当您创建或更新管道时,您可以指定一个可选的version
。您可以使用这个版本号和if_version参数来有条件地更新管道。如果指定了if_version参数,则成功的更新将增加管道的版本号。
创建
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "My optional pipeline description",
"processors" : [
{
"set": {
"field": "ingest_field",
"value": "1111111"
}
}
]
}
查看
GET /_ingest/pipeline/my-pipeline-id
可以使用_meta参数向管道添加任意元数据。
模拟文档执行simulate
模拟一组文档执行ingest pipelines
POST /_ingest/pipeline/ingest_test/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
文档执行
post document
POST dsl_test_index/_doc?pipeline=ingest_test
{
"id": "5",
"TITLE": "ingest_test"
}
update by query
POST dsl_test_index/_update_by_query?pipeline=ingest_test
{
"query": {
"term": {
"ID": "1"
}
},
"max_docs": 1
}
reindex
POST _reindex
{
"source": {
"index": "dsl_test_index"
},
"dest": {
"index": "dsl_test_index_3",
"op_type": "create",
"pipeline": "ingest_test"
}
}
index.default_pipeline
索引的默认的ingest pipeline。如果设置了默认pipeline且pipeline不存在,则索引请求将失败。默认值可以使用pipeline参数重写。特殊管道名称_none表示不应运行ingest pipeline。
index.final_pipeline
索引的最终的ingest pipeline。如果设置了最终pipeline且pipeline不存在,则索引请求将失败。最终的pipeline总是在请求pipeline(如果指定了)和默认pipeline(如果存在)之后运行。特殊的pipeline名称_none表示不会运行任何ingest pipeline。
复杂写法
script
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"script": {
"description": "Extract 'tags' from 'env' field",
"lang": "painless",
"source": """
String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);
ArrayList tags = new ArrayList();
tags.add(envSplit[params['position']].trim());
ctx['tags'] = tags;
""",
"params": {
"delimiter": "-",
"position": 1
}
}
}
]
},
"docs": [
{
"_source": {
"env": "es01-prod"
}
}
]
}
网友评论