美文网首页
ElasticSearch | Ingest Pipeline

ElasticSearch | Ingest Pipeline

作者: 乌鲁木齐001号程序员 | 来源:发表于2020-05-28 20:44 被阅读0次

    Ingest Node

    • ElasticSearch 5.0 后,引入的一种新的节点类型,默认配置下,每个节点都是 Ingest Node;
    • Ingest Node 具有预处理数据的能力,可拦截 Index 或 Bulk API 的请求,并对数据进行转换,然后重新返回给 Index 或 Bulk API,最后写入到 ElasticSearch 中;
    • 无需 Logstash,就可以进行数据的预处理,例如:
      • 为某个字段设置默认值;
      • 重命名某个字段的字段名;
      • 对字段值进行 Split 操作;
      • 支持设置 Painless Script,对数据进行更加复杂的加工;

    Ingest Node vs Logstash

      Logstash Ingest Node
    数据输入与输出 支持从不同的数据源读取,并写入不同的数据源 支持从 ES REST API 获取数据并且写入 ElasticSearch
    数据缓冲 实现了简单的数据队列,支持重写 不支持缓冲
    数据处理 支持大量的插件,也支持定制开发 内置的插件,可以开发 Plugin 进行扩展(Plugin 更新需要重启)
    配置和使用 增加了一定的架构复杂度 无需额外部署

    Pipeline & Processor

    Pipeline.png

    在 Ingest Node 中可以定义 Pipeline

    Pipeline
    • Pipeline 会对通过的数据(文档),按照顺序进行加工;
    Processor
    • ElasticSearch 对一些加工的行为进行了抽象的包装;
    • ElasticSearch 有很多内置的 Processor,也支持通过插件的方式,实现自己的 Processor;

    ElasticSearch | 内置 Processor

    • Split Processer - 将给定字段值分成一个数组
    • Remove / Rename Processer - 移除 / 重命名一个字段
    • Append Processer - 为商品增加一个新的标签
    • Convert Processer - 将商品价格,从字符串转成 float 类型
    • Date / JSON Processer - 日期格式转换 / 字符串转 JSON 对象
    • Date Index Name Processor - 将通过该 Processor 的文档,分配到指定时间格式的索引中
    • Fail Processer - 一旦出现异常,该 Pipeline 指定的错误信息能返回给用户
    • Foreach Processer - 数组字段,数组的每个元素都会使用到一个相同的处理器
    • Grok Processor Processer - 日志的日期格式切割
    • Gsub / Join / Split Processer - 字符串替换 / 数组转字符串 / 字符串转数组
    • Lowercase / Upcase Processer - 大小写转换

    Pipeline | 举个栗子

    准备数据
    PUT tech_blogs/_doc/1
    {
      "title":"Introducing big data......",
      "tags":"hadoop,elasticsearch,spark",
      "content":"You konw, for big data"
    }
    
    _simulate API | 将字段的值用 "," 分割
    • tags 字段不再是字符串,而是字符串的数组;
    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "to split blog tags",
        "processors": [
          {
            "split": {
              "field": "tags",
              "separator": ","
            }
          }
        ]
      },
      "docs": [
        {
          "_index": "index",
          "_id": "id",
          "_source": {
            "title": "Introducing big data......",
            "tags": "hadoop,elasticsearch,spark",
            "content": "You konw, for big data"
          }
        },
        {
          "_index": "index",
          "_id": "idxx",
          "_source": {
            "title": "Introducing cloud computering",
            "tags": "openstack,k8s",
            "content": "You konw, for cloud"
          }
        }
      ]
    }
    
    _simulate API | 在 Pipeline 中再添加一个 Processor
    • 为文档添加字段 views,并设置默认值 0;
    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "to split blog tags",
        "processors": [
          {
            "split": {
              "field": "tags",
              "separator": ","
            }
          },
          {
            "set":{
              "field": "views",
              "value": 0
            }
          }
        ]
      },
    
      "docs": [
        {
          "_index":"index",
          "_id":"id",
          "_source":{
            "title":"Introducing big data......",
      "tags":"hadoop,elasticsearch,spark",
      "content":"You konw, for big data"
          }
        },
        {
          "_index":"index",
          "_id":"idxx",
          "_source":{
            "title":"Introducing cloud computering",
      "tags":"openstack,k8s",
      "content":"You konw, for cloud"
          }
        }
      ]
    }
    
    在 ElasticSearch 中添加一个 Pipeline
    PUT _ingest/pipeline/blog_pipeline
    {
      "description": "a blog pipeline",
      "processors": [
          {
            "split": {
              "field": "tags",
              "separator": ","
            }
          },
          {
            "set":{
              "field": "views",
              "value": 0
            }
          }
        ]
    }
    
    查看 Pipeline
    GET _ingest/pipeline/blog_pipeline
    
    测试 Pipeline
    POST _ingest/pipeline/blog_pipeline/_simulate
    {
      "docs": [
        {
          "_source": {
            "title": "Introducing cloud computering",
            "tags": "openstack,k8s",
            "content": "You konw, for cloud"
          }
        }
      ]
    }
    
    使用 Pipeline 和不使用 Pipeline 向索引中添加数据
    • 一条数据被 Pipeline 处理,另一条没有;
    PUT tech_blogs/_doc/1
    {
      "title":"Introducing big data......",
      "tags":"hadoop,elasticsearch,spark",
      "content":"You konw, for big data"
    }
    
    PUT tech_blogs/_doc/2?pipeline=blog_pipeline
    {
      "title": "Introducing cloud computering",
      "tags": "openstack,k8s",
      "content": "You konw, for cloud"
    }
    
    POST tech_blogs/_search
    {}
    
    使用 blog_pipeline 重建 tech_blogs 中的所有文档
    • update_by_query 会导致错误,因为 id 为 1 的文档的 tags 字段是字符串数组,而不是字符串;
    POST tech_blogs/_update_by_query?pipeline=blog_pipeline
    {
    }
    
    增加 update_by_query 的条件
    • 只对没有 views 字段的文档作用 blog_pipeline;
    POST tech_blogs/_update_by_query?pipeline=blog_pipeline
    {
        "query": {
            "bool": {
                "must_not": {
                    "exists": {
                        "field": "views"
                    }
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:ElasticSearch | Ingest Pipeline

          本文链接:https://www.haomeiwen.com/subject/liqzahtx.html