美文网首页
elk日志收集系统搭建

elk日志收集系统搭建

作者: 香港记者mo | 来源:发表于2021-08-23 16:10 被阅读0次

    因为是在开发测试环境搭建,为了调试的方便,使用的是filebeat->logstash->es的形式,没有使用kafka。

    首先为保证收集到的日志可方便进行格式化,在springboot工程中配置日志的pattern(使用logback),我设置的为:[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] [%-5level] [%logger{50}] - [%msg] - # '%ex'%n  。 然后针对输出的全量日志编写filebeat的配置文件,如下:

    filebeat.inputs:

    - type: log

      enabled: true

      paths:

        -  /home/workspace/logs/luke-order/luke-order-info.log

      multiline: 

        pattern: '^\[' 

        negate: true

        match: after

        max_lines: 500

        timeout: 5s

    output:

      logstash:

        # enabled: true

        hosts: ["127.0.0.1:5044"]

    multiline作用是针对异常堆栈信息,将多行匹配为一个message,我的日志都是以“[”打头。输出到5044是logstash的接受端口。然后是logstash的配置,如下:

    input {

            beats {

                    port => "5044"

            }

    }

    #filter {

    #    dissect {

    #        mapping => {

    #            "message" => "[%{date}] [%{thread}] [%{level}] [%{class}] - [%{msg}] - # (\'\'|%{exception})"

    #        }

    #    }

    #}

    filter {

        grok {

            match => [

                "message", "\[%{DATA:eventDate}\] \[%{NOTSPACE:thread}\] \[%{DATA:level}\] \[%{NOTSPACE:class}\] - \[%{DATA:msg}\] - # (\'\'|%{QUOTEDSTRING:exception})"

            ]

        }

        date {

      match => ["eventDate","yyyy-MM-dd HH:mm:ss.SSS"]

      timezone => "Asia/Shanghai"

      target => "eventDate"

        }

        mutate{

            remove_field => ["host","log","@version","ecs","agent","input","flags"]

    gsub =>["level"," ",""]

        }

    }

    output {

    elasticsearch {

    hosts => ["127.0.0.1:9200"]

    index => "luke-order-log-%{+YYYY-MM-dd}"

    manage_template => false

          template_name => "luke-order-log-template"

    }

            stdout { codec => rubydebug }

    }

    关于日志的格式化,本来想使用dissect ,因为其是使用分割的方式来获取各字段,性能优于grok,但是发现针对异常堆栈,它无法做到完美的匹配,使用好多方式都不行,最终放弃还是使用grok。格式化完成后,针对我日志中获取的日期字段eventDate进行处理,使用logstash的date插件,这一步必须要做,因为最终数据展示在kibana时,会为作为x坐标的日期字段加上8小时(时区的问题),所以我只能在filter内处理是对我要使用的日期字段进行时区的处理。之所以没使用logstash自己的@timestamp字段,是因为它是processTime,而我要使用的是eventTime。然后输出到es,在此之前,在es中要建立索引模板,不然我自定义的日期字段什么的最终都是text类型,就没法作为kinana的discover的时间线坐标字段了。然后就是output中要禁用掉logstash的默认索引模板,启用自定义的即可(注意:es的索引名我是按天创建,即在索引名前缀后加上%{+YYYY-MM-dd},而这个依赖于@timestamp字段,之前我将@timestamp字段删了,索引名就没有后面的日期了),最后启动filebeat(./filebeat -e -c log-filebeat-logstash.yml,启动logstash(./bin/logstash -f script/filebeat-logstash-es.conf --config.reload.automatic)。另外es的索引模板如下:

    PUT /_template/luke-order-log-template

    {

      "index_patterns": "luke-order-log-*",

      "order": 1,

      "settings": {

        "number_of_shards": 2

      },

      "mappings": {

        "properties": {

          "class": {

            "type": "text"

          },

          "@timestamp":{

            "type": "date"

          },

          "eventDate": {

            "type": "date"

          },

          "exception": {

            "type": "text"

          },

          "level": {

            "type": "keyword"

          },

          "message": {

            "type": "text"

          },

          "msg": {

            "type": "text"

          },

          "thread": {

            "type": "text"

          }

        }

      }

    }

    最终es中存入数据,在kibana创建Index patterns,以我自定义的eventDate字段为维度,然后在discover模块中进行查看,使用kql之类的进行查询。

    相关文章

      网友评论

          本文标题:elk日志收集系统搭建

          本文链接:https://www.haomeiwen.com/subject/yrrliltx.html