美文网首页
ElasticSearch | Logstash | 入门 |

ElasticSearch | Logstash | 入门 |

作者: 乌鲁木齐001号程序员 | 来源:发表于2020-06-02 15:43 被阅读0次

    Logstash

    数据搜集处理引擎,支持 200 多个插件。


    Logstash.png

    Logstash | 架构简介

    Codec(Code / Decode):将原始数据 decode 成 Event,再将 Event encode 成目标数据。


    Logstash 架构.png

    Logstash | 相关概念

    Pipeline
    • 包含了 input-filter-output 三个阶段的处理流程;
    • Logstash 支持 200 多个插件,并支持插件的生命周期管理;
    • Logstash 有自己的队列;
    Event
    • Event 是数据在内部流转时的具体表现形式,数据在 input 阶段被转换为 Event,在 output 被转换成目标格式数据;
    • Event 其实是一个 Java Object,在 filter 阶段可以对 Event 的属性进行增删改查;

    Input Plugins

    一个 Pipeline 可以有多个 input 插件:

    • Stdin / File
    • Beats / Log4J / Elasticsearch / JDBC / Kafka / Rabbitmq / Redis
    • JMX / HTTP / Websocket / UDP / TCP
    • Google Cloud Storage / S3
    • Github / Twitter
    Input Plugin | File
    • 支持从文件中读取数据;
    • 文件读取需要解决的问题:文件只能被读取一次,重启后,需要从上次读取的位置继续,通过 sincedb 实现;
    • 读取到文件新内容,发现新文件;
    • 文件发生归档操作(文档位置发生变化,日志 Rotation),不能影响当前的内容读取;

    Output Plugin

    将 Event 发送到特定的目的地,是 Pipeline 的最后一个阶段:

    • ElasticSearch
    • Email / Pageduty
    • Influxdb / Kafka / MongoDB / Opensdb / Zabbix
    • HTTP / TCP / Websocket

    Codec Plugin

    将原始数据 decode 成 Event;将 Event encode 成目标数据:

    • Line / Multiline
    • JSON / Avro / Cef(ArcSight Common Event Format)
    • Dots / Rubydebug

    Filter Plugin

    Filter Plugin 可以对 Logstash Event 进行各种处理,例如解析字段、删除字段、类型转换:

    • Date - 日期解析;
    • Dissect - 分隔符解析;
    • Grok - 正则匹配解析;
    • Mutate - 处理字段、重命名、删除、替换;
    • Metircs - 聚合 Metrics;
    • Ruby - 利用 Ruby 代码来动态修改 Event;
    Filter Plugin | Mutate

    对字段做各种操作:

    • Convert - 类型转换;
    • Gsub - 字符串替换;
    • Split / Join / Merge - 切割 / 数组合并成字符串 / 数组合并;
    • Rename - 字段重命名;
    • Update / Replace - 字段内容更新替换;
    • Remove_field - 字段删除;

    Queue

    Queue.png
    In Memory Queue

    进程 Crash,机器宕机都会引起数据的丢失。

    Persistent Queue

    进程 Crash,机器宕机也不会丢失数据;数据保证会被消费,可以替代 Kafka 等消息队列缓冲区的作用。


    Codec Plugins - Single Line | 举几个栗子

    • sudo bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> rubydebug}}"
    • sudo bin/logstash -e "input{stdin{codec=>json}}output{stdout{codec=> rubydebug}}"
    • sudo bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> dots}}"

    Codec Plugin - Multiline | 举个栗子

    运行 multiline-exception.conf

    sudo bin/logstash -f multiline-exception.conf

    multiline-exception.conf 中的内容
    • 以字母开头的输入会被匹配到;
    • 匹配到的内容属于上一个 Event,需要再输入一个以字母开头的文本,Logstash 才会有输出;
    input {
      stdin {
        codec => multiline {
          pattern => "^\s"
          what => "previous"
        }
      }
    }
    
    
    filter {}
    
    output {
      stdout { codec => rubydebug }
    }
    
    输入一段 Java 的堆栈异常作为多行数据
    Exception in thread "main" java.lang.NullPointerException
            at com.example.myproject.Book.getTitle(Book.java:16)
            at com.example.myproject.Author.getBookTitles(Author.java:25)
            at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
    
    再输入以字母开头的文本

    hello word

    输出上一段匹配到的 Java 的堆栈信息;


    拿个 logstash.conf 出来分析一下

    • 没有用到 sincedb,所以 sincedb_path => "/dev/null"
    input {
      file {
        path => "/home/lixinlei/data/movielens/ml-latest-small/movies.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
    
    • movies.csv 大概长这样:
    movieId,title,genres
    1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
    2,Jumanji (1995),Adventure|Children|Fantasy
    3,Grumpier Old Men (1995),Comedy|Romance
    4,Waiting to Exhale (1995),Comedy|Drama|Romance
    5,Father of the Bride Part II (1995),Comedy
    6,Heat (1995),Action|Crime|Thriller
    7,Sabrina (1995),Comedy|Romance
    ...
    
    • 每行的数据用 , 分割,分割出来的内容,分别归属到列 id,content,genre 下;
    • 列 genre 下的数据用 | 分割;
    • 列 content 下的数据用 ( 分割,分割出来的第 1 个字符串存到新的列下,新的列叫 title;分割出来的第 2 个字符串存到新的列下,新的列叫 year;
    • 把列 year 的数据类型转为 integer;
    • 去掉列 title 中的空格;
    filter {
      csv {
        separator => ","
        columns => ["id","content","genre"]
      }
    
      mutate {
        split => { "genre" => "|" }
        remove_field => ["path", "host","@timestamp","message"]
      }
    
      mutate {
        split => ["content", "("]
        add_field => { "title" => "%{[content][0]}"}
        add_field => { "year" => "%{[content][1]}"}
      }
    
      mutate {
        convert => {
          "year" => "integer"
        }
        strip => ["title"]
        remove_field => ["path", "host","@timestamp","message","content"]
      }
    
    }
    
    • 把读取到的,转换过的信息写入 ElasticSearch 中;
    output {
       elasticsearch {
         hosts => "http://localhost:9200"
         index => "movies"
         document_id => "%{id}"
       }
      stdout {}
    }
    

    相关文章

      网友评论

          本文标题:ElasticSearch | Logstash | 入门 |

          本文链接:https://www.haomeiwen.com/subject/lbtezhtx.html