美文网首页kafka
搭建日志收集流程总结(二)

搭建日志收集流程总结(二)

作者: LOC_Thomas | 来源:发表于2017-10-02 18:22 被阅读84次

    前言

    上一篇说了,如何在一台机器上搭建整个环境,接下来总结一下,自己开发的应用层面以及一些具体的配置改动

    业务日志

    业务日志内容分为两个部分

    1. 业务方自己输出的业务日志,由于业务日志内容要统一规则比较难,只能统一日志级别的内容,比如日志级别,日志类等,
    2. 统一基础框架输出的框架日志(accesslog),这个比较好统一规则,由框架来进行统一控制,并且内容比较重要,包括请求的url,参数,方法,耗时,返回码,http头等等,所以这一类日志用json格式进行打印

    日志输出目录

    日志输出目录可以通过框架进行控制,根据业务名到不同的目录和日志文件,然后通过filebeat来进行收集。filebeat收集之后,filebeat会添加一部分filebeat的数据,然后把我们业务的日志作为message字段,以json的格式输出到kafka同一个topic中,这一部分配置在上面的文章已经提到。

    logstash配置

    logstash从kafka中收到了filebeat传输的日志,输入和输出基本固定,主要就是配置filter层的内容。由于filebeat开始传输的就是json格式,所以logstash要解析一次json

    json {
        source => "message"
      }
    

    然后message就是业务真正输出的内容,这个时候日志的格式是由logback来进行确定的,所以可以通过grok来进行匹配
    logback.xml格式内容

    <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%t] [%X{X-B3-TraceId}] %-5level %logger{50} - %msg%n</pattern>
    

    所以对应的grok可以写成

    grok {
        match => [
          "message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{DATA:threadName}\] \[%{WORD:traceId}\] %{LOGLEVEL:logLevel}\s*%{DATA:className} \- %{GREEDYDATA:message}"
        ]
        overwrite => ["message"]
      }
    

    由于traceid不一定会存在,因为当业务没有开启sleuth的时候,是没有traceid的,上面的解析会失败,所以加了一个补充配置

      #没有traceId的情况
      if "_grokparsefailure" in [tags] {
        grok {
          match => [
            "message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{DATA:threadName}\] \[\] %{LOGLEVEL:logLevel}\s*%{DATA:className} \- %{GREEDYDATA:message}"
          ]
          remove_tag => ["_grokparsefailure"]
          overwrite => ["message"]
        }
      }
    

    然后,上面说了, 业务分为两个部分,具体的业务输出的不能确定内容,但是框架输出的是json格式的数据,所以可以根据className进行判断来进行解析,如果是框架日志,则加入access的前缀。

      #如果是框架的log日志
      if "JsonLogBuilder" in [className] {
        json {
          source => "message"
          target => "access"
        }
      }
    

    最后再给日期进行赋值

    #日期赋值
      date {
        match => [ "timestamp" , "YYYY-MM-dd HH:mm:ss.SSS" ]
        target => "@timestamp"
      }
    

    这样就把原始的日志进行了转换,存储到了es里面。

    附上具体的grok的pattern的规则地址:
    https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

    kibana配置和使用

    然后打开kibana,通过指定index,就可以看到具体的日志内容了,然后具体的内容根据指定的格式被做成了es的field,这样就可以根据具体的field进行搜索、聚合、统计了。或者根据这些字段自己制作图形,最后展示在dashboard上。

    另外如果在kibana上进行搜索,可以使用lucence的搜索语法
    常用的有
    search message
    field:(field1 and field2)
    + - = && || > < ! ( ) { } [ ] ^ " ~ * ? : \ / 需要用“\”进行转译

    相关文章

      网友评论

        本文标题:搭建日志收集流程总结(二)

        本文链接:https://www.haomeiwen.com/subject/rwstyxtx.html