美文网首页
Elasticsearch 7.x 深入【12】Pipeline

Elasticsearch 7.x 深入【12】Pipeline

作者: 孙瑞锴 | 来源:发表于2020-06-01 19:14 被阅读0次

    1. 借鉴

    极客时间 阮一鸣老师的Elasticsearch核心技术与实战
    如何在Elasticsearch中使用pipeline API来对事件进行处理
    (译) Ingest Node (预处理节点)
    Elasticsearch Pipeline 详解

    2. 开始

    数据准备:<Elasticsearch 7.x 深入 数据准备>

    默认情况下,每个节点都是Ingest Node,可以通过node.ingest=true|false来设置

    Ingest Node 具有以下功能

    1. 预处理,可拦截Index或者Bulk API的请求
    2. 对数据进行转化和加工,并重新返回给Index或者Bulk API

    Ingest Node的组件

    • Pipeline 对通过的数据按照顺序进行加工
    • Processor 对一些加工的行为进行了封装

    处理流程图

    处理流程

    Pipeline的组成

    • description 描述该pipeline
    • processors 定义了一系列的processors

    测试未加入节点的Pipeline

    在将pipeline加入到节点之前,我么可以使用simulate API测试编写的Pipeline,模板如下:

    POST _ingest/pipeline/_simulate
    {
      "pipeline" : { // 这里指定pipeline
        , "description": "" // 这里是pipeline的描述
        , "processors": [ // 这里指定一个或者多个processors
          {}
        ]
      },
      "docs" : [ // 这里指定一个或者多个文档
        { "_source": {} }
      ]
    }
    

    接下来,我们测试一下:
    这里我用两个酒店,以及他们没有处理的标签进行测试,其中split是pipeline中的一个processor,等会我们介绍一下其他的processor类型,目前这个是按照","进行切分

    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "split pipeline测试",
        "processors": [
          {
            "split": {
              "field": "tags",
              "separator": ","
            }
          }
        ]
      },
      "docs": [
        {
          "_source": {
            "name": "好哇热油酒店",
            "tags": "高,大,上"
          }
        },
        {
          "_source": {
            "name": "IMfine酒店",
            "tags": "舒适,情趣,惬意"
          }
        }]
    }
    

    我们看下结果:
    可以看到tag已经按照","切分了

    {
      "docs" : [
        {
          "doc" : {
            "_index" : "_index",
            "_type" : "_doc",
            "_id" : "_id",
            "_source" : {
              "name" : "好哇热油酒店",
              "tags" : [
                "高",
                "大",
                "上"
              ]
            },
            "_ingest" : {
              "timestamp" : "2020-06-01T09:28:34.386Z"
            }
          }
        },
        {
          "doc" : {
            "_index" : "_index",
            "_type" : "_doc",
            "_id" : "_id",
            "_source" : {
              "name" : "IMfine酒店",
              "tags" : [
                "舒适",
                "情趣",
                "惬意"
              ]
            },
            "_ingest" : {
              "timestamp" : "2020-06-01T09:28:34.386Z"
            }
          }
        }
      ]
    }
    

    添加Pipeline

    如果上面我们测试的pipeline,觉得已经很完美了,我想把它加入到节点中,那我们就来做这个事情。
    模板如下:

    PUT _ingest/pipeline/pipeline名称
    {
      "description": "" // 这里是pipeline的描述
        , "processors": [ // 这里指定一个或者多个processors
          {}
        ]
    }
    

    我们来试下,将上面的那个加入到节点中

    PUT _ingest/pipeline/hotel_pipeline
    {
      "description": "酒店索引的pipeline",
      "processors": [
        {
          "split": {
            "field": "tags",
            "separator": ","
          }
        }
      ]
    }
    

    测试已加入到节点中的Pipeline

    如果pipeline已经加入到了节点中,我们如何测试呢?
    模板

    POST _ingest/pipeline/pipeline的名称/_simulate
    {
      "docs": [ // 指定多个文档
        {
          "_source": {
          }
        }]
    }
    

    我们测试一下:

    POST _ingest/pipeline/hotel_pipeline/_simulate
    {
      "docs": [
        {
          "_source": {
            "name": "好哇热油酒店",
            "tags": "高,大,上"
          }
        },
        {
          "_source": {
            "name": "IMfine酒店",
            "tags": "舒适,情趣,惬意"
          }
        }]
    }
    

    我们看下结果

    {
      "docs" : [
        {
          "doc" : {
            "_index" : "_index",
            "_type" : "_doc",
            "_id" : "_id",
            "_source" : {
              "name" : "好哇热油酒店",
              "tags" : [
                "高",
                "大",
                "上"
              ]
            },
            "_ingest" : {
              "timestamp" : "2020-06-01T09:43:26.398Z"
            }
          }
        },
        {
          "doc" : {
            "_index" : "_index",
            "_type" : "_doc",
            "_id" : "_id",
            "_source" : {
              "name" : "IMfine酒店",
              "tags" : [
                "舒适",
                "情趣",
                "惬意"
              ]
            },
            "_ingest" : {
              "timestamp" : "2020-06-01T09:43:26.398Z"
            }
          }
        }
      ]
    }
    

    使用Pipeline

    pipeline已经被加入到节点中了,那我们如何在索引文档时使用上呢?
    模板

    PUT /index的名称/_doc/1?pipeline=pipeline的名称
    {
      // 数据
    }
    

    我们来试一下
    我往pipeline_hotel这个索引里面加入2篇文档,第一篇成功了,第二篇失败了,那我们如何处理异常情况呢?

    PUT /pipeline_hotel/_doc/1?pipeline=hotel_pipeline
    {
      "name": "莫德凯撒酒店",
      "tags": "舒适,极端舒适,舒适极了"
    }
    
    PUT /pipeline_hotel/_doc/1?pipeline=hotel_pipeline
    {
      "name": "莫德凯撒酒店"
    }
    

    这里我只粘贴出第二篇的错误信息,第一个成功就不粘贴了

    {
      "error": {
        "root_cause": [
          {
            "type": "exception",
            "reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [tags] not present as part of path [tags]",
            "header": {
              "processor_type": "split"
            }
          }
        ],
        "type": "exception",
        "reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [tags] not present as part of path [tags]",
        "caused_by": {
          "type": "illegal_argument_exception",
          "reason": "java.lang.IllegalArgumentException: field [tags] not present as part of path [tags]",
          "caused_by": {
            "type": "illegal_argument_exception",
            "reason": "field [tags] not present as part of path [tags]"
          }
        },
        "header": {
          "processor_type": "split"
        }
      },
      "status": 500
    }
    

    处理异常情况

    在上面我们看到了异常信息,那我们如何处理呢?
    对于我们上面的这个例子,错误忽略即可,那我们可以重新定义pipeline,并做以下配置:

    PUT _ingest/pipeline/hotel_pipeline2
    {
      "description": "酒店索引的pipeline",
      "processors": [
        {
          "split": {
            "field": "tags",
            "separator": ",",
            "ignore_failure": true // 这里指定发生错误时忽略
          }
        }
      ]
    }
    

    当然这里只是简单的略过了,有时候我们必须做一些额外的事情该如何搞呢?

    processor级别错误处理

    此级别可以有两种处理:

    • 忽略:配置ignore_failure=true
    • 处理:配置on_failure,里面可以定义多个处理错误的processor

    忽略上面我们用到了,我们来处理一下,如果出现错误,我们就新增一个字段(error),并拼接上新的错误信息

    PUT _ingest/pipeline/hotel_pipeline3
    {
      "description": "酒店索引的pipeline",
      "processors": [
        {
          "split": {
            "field": "tags",
            "separator": ",",
            "on_failure": [
              {
                "set": {
                    "field": "error",
                    "value": "{{ctx.error}};{{_ingest.on_failure_message}}"
                  }
              }]
          }
        }
      ]
    }
    

    看到了,在on_failure中,我们能通过_ingest能获取并仅能获取到三个属性:on_failure_message(错误信息), on_failure_processor_type(处理器类型), and on_failure_processor_tag(处理器的tag)

    pipeline级别错误处理

    此级别只有一种处理:

    • 处理:配置on_failure

    这里我们配置的是:如果出错了,将这个文档添加到索引“failed-pipeline_hotel”中。当然这里的“pipeline_hotel”是一个变量,对哪个索引运用了pipeline,那这个变量就是哪个索引的名字
    注:其实这是官网的例子,这里我就用这个为例了,我感觉官网这个例子极好

    PUT _ingest/pipeline/hotel_pipeline4
    {
      "description": "酒店索引的pipeline",
      "processors": [
        {
          "split": {
            "field": "tags",
            "separator": ","
          }
        }
      ],
      "on_failure" : [
        {
          "set" : {
            "field" : "_index",
            "value" : "failed-{{_index}}"
          }
        }
      ]
    }
    

    我们试一下:
    我们索引以下文档,以下文档不包含tags

    PUT /pipeline_hotel/_doc/1?pipeline=hotel_pipeline4
    {
      "name": "莫德凯撒酒店"
    }
    

    我们看看es的返回信息

    {
      "_index" : "failed-pipeline_hotel",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 2,
      "result" : "updated",
      "_shards" : {
        "total" : 2,
        "successful" : 1,
        "failed" : 0
      },
      "_seq_no" : 1,
      "_primary_term" : 1
    }
    

    可以看到这个文档被索引到了failed-pipeline_hotel里面

    查看Pipeline

    GET _ingest/pipeline/pipeline的名称
    
    如:
    GET _ingest/pipeline/hotel_pipeline
    

    删除Pipeline

    DELETE _ingest/pipeline/pipeline的名称
    
    如:
    DELETE _ingest/pipeline/hotel_pipeline
    

    Processor类型

    processors

    注:本图来自借鉴的文章,多谢。基础使用大家也可以看借鉴的文章,以后有其他用法再整理在这里。// TODO

    3. 大功告成

    相关文章

      网友评论

          本文标题:Elasticsearch 7.x 深入【12】Pipeline

          本文链接:https://www.haomeiwen.com/subject/xgdszhtx.html