美文网首页Dapeng-soa
dapeng日志的收集处理及查询应用

dapeng日志的收集处理及查询应用

作者: 洋洋_3720 | 来源:发表于2018-07-30 12:00 被阅读0次
    dapeng-soa.png

    背景

    随着互联网时代数据规模的爆发式增长,传统的单机系统在性能和可用性上已经无法胜任,分布式应用和服务化应用开始走进大家的视野,但是分布式的部署也会带来另外的问题,日志分散在各个应用服务节点中,出现问题不方便及时排查,尤其是服务化的应用中,分析问题时可能需要查看多个日志文件才能定位问题,如果相关项目不是一个团队维护时沟通成本更是直线上升,怎么将日志文件归集,怎么将日志文件呈现成了很多公司需要面对的问题,因此日志系统应运而生。

    dapeng日志系统的选型

    日志系统通常有三部分组成,采集器、解析器、存储器

    采集器通常部署在各个应用结点中,它监控本地文件的变化,对于新产生的日志变化,它实时收集并发送给对应的解析器,常见的采集器有flume、logstash、fluentd以及更轻量级的fluent-bit

    解析器通常和采集器结合在一起,也有一部分解析器是通过接收缓冲队列,将日志解析成json格式数据后,把数据发往存储器进行存储

    存储器用于存储对应的数据,提供相关的查询,常见的存储有hdfs、elasticsearch

    我们dapeng选取的是fluent-bit+fluentd+kafka+elasticsearch作为日志系统的方案,zookeeper、elasticsearch、kafka都采用集群模式,示例图中采用单结点fluent-bit收集各个docker容器中的日志文件发往fluentd,fluentd做为中转收集所有的日志发往kafak用于削峰填谷,削峰后的数据再经由fluentd发送给elasticsearch进行存储

    log-system.png

    为了支持fluent-bit<=>fluentd的高可用, 我们改动了fluent-bit的源码. 另外, 生产环境上, 上述结构图中的每一个环节都不能省, 以免数据量太大发生不可预料的错误.
    目前我们生产环境, 小规模应用的情况下, 每天大概产生1亿条日志记录.

    关于MDC的小插曲

    Logback中有一项功能很好使-MDC,映射诊断环境(Mapped Diagnostic Context)MDC本质上是使用的ThreadLocal。系统调用链可能很长,为了方便日志跟踪,统一打印标识。我们dapeng使用MDC来保存sessionTid,在一个完整的调用链中使sessionTid在各个服务中进行传递,将服务进行串联,方便问题定位,具体的logback如下

    <appender name="SIMPLEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <prudent>false</prudent>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${soa.base}/logs/simple-dapeng-container.%d{yyyy-MM-dd}.log</fileNamePattern>
                <maxHistory>30</maxHistory>
            </rollingPolicy>
            <encoder>
                <pattern>%d{MM-dd HH:mm:ss SSS} %t %p [%X{sessionTid}] - %m%n</pattern>
            </encoder>
        </appender>
    

    配置采集器

    参照dapeng基础镜像启动方式,这里启动了两个fluent-bit文件,一块用于收集日志信息,一块用于收集gc信息
    /opt/fluent-bit/fluent-bit -c /opt/fluent-bit/etc/fluent-bit.conf
    /opt/fluent-bit/fluent-bit -c /opt/fluent-bit/etc/fluent-bit-gcinfo.conf

    fluent-bit.conf对应的文件如下

    [SERVICE]
        Flush        5
        Daemon       On
        Log_Level    error
        Log_File     /fluent-bit/log/fluent-bit.log
        Parsers_File parse_dapeng.conf
    
    [INPUT]
        Name tail
        Path ${fluentBitLogPath}
        Exclude_Path  ${fluentBitLogPathExclude}
        Tag  ${fluentbitTag}
        Multiline  on
        Buffer_Chunk_Size 2m
        buffer_max_size  30m
        Mem_Buf_Limit  32m
        DB.Sync  Normal
        db_count 400
        Parser_Firstline dapeng_multiline
        db  /fluent-bit/db/logs.db
    
    [FILTER]
        Name record_modifier
        Match *
        Record hostname ${container_ip}
        Record tag ${serviceName}
    
    [OUTPUT]
        Name  Forward
        Match *
        Host  fluentd
        Port  24224
        HostStandby fluentdStandby
        PortStandby 24224
    
    

    fluent-bit-gcinfo.conf配置文件如下

    [SERVICE]
        Flush        5
        Daemon       On
        Log_Level    error
        Log_File     /fluent-bit/log/fluent-bit-gcinfo.log
    
    [INPUT]
        Name tail
        Path ${fluentBitGcInfoPath}
        Tag  ${fluentbitGcInfoTag}
        Buffer_Chunk_Size 2m
        buffer_max_size  30m
        Mem_Buf_Limit  32m
        DB.Sync  Normal
        db_count 400
        db  /fluent-bit/db/logs-gc.db
    
    [FILTER]
        Name record_modifier
        Match *
        Record hostname ${host_ip}
        Record tag ${serviceName}
    
    [OUTPUT]
        Name  Forward
        Match *
        Host  fluentd
        Port  24224
        HostStandby fluentdStandby
        PortStandby 24224
    

    record_modifer用于在解析出的json中增加hostname标识和tag标识方便日志检索
    chunk及buffer块的设置根据各系统日志的大小来进行设置
    HostStandby和PortStandby是我们dapeng基于原生fluent-bit进行改造添,当主fluentd挂掉后,日志事件会相应的发送给fluentdstandBy进行处理
    环境变量通过docker初始化将以下变量进行设置

    fluentBitLogPath=/dapeng-container/logs/*.log
    fluentBitLogPathExclude=/dapeng-container/logs/fluent*.log,/dapeng-container/logs/console.log,/dapeng-container/logs/gc*.log
    Tag=dapeng  
    fluentBitGcInfoPath=/dapeng-container/logs/gc*.log
    fluentbitGcInfoTag=gcInfoTag
    

    解析器的配置

    [PARSER]
        Name        dapeng_multiline
        Format      regex
        Regex       ${fluentbitParserRegex}
    

    解析器这块对应上面的logback配置,将日志消息处理成比较直观的JSON数据进行存储fluentbitParserRegex通过环境变量写入,正则解析可以参考https://regex101.com/ 写自己对应的正则解析

    fluentbitParserRegex=(?<logtime>^\d{2}-\d{2} \d{2}:\d{2}:\d{2} \d{3}) (?<threadPool>[^ ]+|Check idle connection Thread) (?<level>[^ ]+) \[(?<sessionTid>\w*)\] - (?<message>.*)
    
    
    

    转发器fluentd的配置(用于接收消息发送kafka)

    <system>
            log_level error
            flush_thread_count 8
            workers 8
    </system>
    <source>
      @type  forward
      port  24224
    </source>
    <source>
      @type monitor_agent
      port 24225
    </source>
    
    <match dapeng>
      @type kafka_buffered
      brokers 192.168.20.200:9092,192.168.20.135:9092,192.168.20.136:9092
      topic_key efk
     #zookeeper 192.168.20.200:2181
      buffer_type file
      buffer_path /tmp/buffer
      flush_interval 5s
      default_topic efk
      output_data_type json
      compression_codec gzip
      max_send_retries 3
      required_acks -1
      discard_kafka_delivery_failed true
    </match>
    
    
    <match gcInfoTag>
      @type kafka_buffered
      brokers 192.168.20.200:9092,192.168.20.135:9092,192.168.20.136:9092
      topic_key gcinfo
     #zookeeper 192.168.20.200:2181
      buffer_type file
      buffer_path /tmp/buffer2
      flush_interval 5s
      default_topic gcinfo
      output_data_type json
      compression_codec gzip
      max_send_retries 3
      required_acks -1
      discard_kafka_delivery_failed true
    </match>
    
    

    这里的配置文件中有两个match,分别对应fluent-bit发送过来的日志和gc信息,然后分别发送到不同的kafka-topic中,monitor_agent是fluentd的一个插件,可以及时获取fluentd响应用于fluentd的健康度检查

    [root@monitor-elk etc]# curl 192.168.20.200:24225/api/plugins.json
    {"plugins":[{"plugin_id":"object:3ff681f97a88","plugin_category":"input","type":"forward","config":{"@type":"forward","port":"24224"},"output_plugin":false,"retry_count":null},{"plugin_id":"object:3ff681c37078","plugin_category":"input","type":"monitor_agent","config":{"@type":"monitor_agent","port":"24225"},"output_plugin":false,"retry_count":null},{"plugin_id":"object:3ff681c19ca8","plugin_category":"output","type":"kafka_buffered","config":{"@type":"kafka_buffered","brokers":"192.168.20.200:9092","topic_key":"messages","buffer_type":"file","buffer_path":"/tmp/buffer","flush_interval":"60s","default_topic":"messages","output_data_type":"json","compression_codec":"gzip","max_send_retries":"3","required_acks":"-1","discard_kafka_delivery_failed":"true"},"output_plugin":true,"buffer_queue_length":0,"buffer_total_queued_size":1174144,"retry_count":6,"retry":{}}]}
    

    转发器fluentd的配置(用于接收kafka中的消息发送elasticsearch)

    <system>
            log_level error
            flush_thread_count 2
            workers 2
    </system>
    <source>
      @type kafka_group
      brokers 192.168.20.200:9092,192.168.20.135:9092,192.168.20.136:9092
      consumer_group efk-consumer
      topics efk
      format json
      start_from_beginning false
      max_wait_time 5
      max_bytes 1500000
    </source>
    <source>
      @type monitor_agent
      port 24225
    </source>
    <match>
        @type elasticsearch
        hosts 192.168.20.200:9200,192.168.20.135:9200,192.168.20.136:9200
        index_name dapeng_log_index
        type_name  dapeng_log
        content_type application/x-ndjson
        buffer_type file
        buffer_path /tmp/buffer_file
        buffer_chunk_limit 10m
        buffer_queue_limit 512
        flush_mode interval
        flush_interval 5s
        request_timeout 5s
        flush_thread_count 2
        reload_on_failure true
        resurrect_after 30s
        reconnect_on_error true
        with_transporter_log true
        logstash_format true
        logstash_prefix dapeng_log_index
        template_name dapeng_log_index
        template_file  /fluentd/etc/template.json
        num_threads 2
        utc_index  false
    </match>
    

    start_from_beginning默认为true,代表从消息队列中起始读取数据,当fluentd重启会造成日志消息冗余,因此这里配置false,如果需要恢复日志索引,可以配置成true让日志消息再消息一次(我们日志kafka消息保留的策略是保留1天,因此当出现故障时我们可以快速恢复1天内的日志)
    logstash_format 用于配置将日志索引按天数来存放

    鉴于在双11和一些大促活动中,我们对es模板进行了一些调整,对索引进行了些优化,没有使用fluentd自动生成es索引,默认保留7天内的索引和只打开3天内的索引,参考脚本如下:

    #!/bin/bash
    #
    # 索引关闭及删除
    
    # @date 2018年05月10日18:00:00
    # @description Copyright (c) 2015, github.com/dapeng-soa All Rights Reserved.
    
    
    date=`date -d "2 days ago" +%Y.%m.%d`
    date1=`date -d "6 days ago" +%Y.%m.%d`
    echo $date
    echo $date1
    #关闭索引
    curl -XPOST http://192.168.20.200:9200/dapeng_log_index-$date/_close
    #删除索引
    curl -XDELETE "http://192.168.20.200:9200/dapeng_log_index-$date1"
    #添加索引
    tomorrow=`date -d tomorrow +%Y.%m.%d`
    ipList=(192.168.20.135:9200 192.168.20.136:9200 192.168.20.200:9200)
    for i in ${ipList[@]};do
    curl -XPUT http://$i/dapeng_log_index-$tomorrow -d'
    {
      "mappings": {
        "_default_": {
                "_all": {
                    "enabled": "false"
                }
            },
        "dapeng_log": {
    
          "properties": {
            "logtime": {
              "type": "date",
              "format": "MM-dd HH:mm:ss SSS"
            },
            "threadPool": {
              "type": "keyword",
              "index": "not_analyzed",
              "norms": false,
              "index_options": "docs"
            },
            "level": {
              "type": "keyword",
              "index": "not_analyzed",
              "norms": false,
              "index_options": "docs"
            },
            "tag": {
              "type": "keyword",
              "index": "not_analyzed",
              "norms": false,
              "index_options": "docs"
            },
            "message": {
              "type": "keyword",
              "index": "not_analyzed",
              "ignore_above": 2048,
              "norms": false,
              "index_options": "docs"
            },
            "hostname": {
              "type": "keyword",
              "index": "not_analyzed",
              "norms": false,
              "index_options": "docs"
            },
            "sessionTid": {
              "type": "keyword",
              "index": "not_analyzed",
              "norms": false,
              "index_options": "docs"
            },
            "log": {
              "type": "keyword",
              "index": "not_analyzed",
              "norms": false,
              "index_options": "docs"
            }
          }
        }
      },
      "settings": {
        "index": {
          "max_result_window": "100000000",
          "number_of_shards": "3",
          "number_of_replicas": "1",
          "codec": "best_compression",
          "translog": {
            "sync_interval": "60s",
            "durability": "async",
            "flush_threshold_size": "1024mb"
          },
          "merge":{
        "policy":{
           "max_merged_segment": "2gb"
        }
          },
          "refresh_interval": "10s"
    
        }
      },
      "warmers": {}
    }'
    response=`curl -s "http://$i/_cat/indices?v" |grep open | grep dapeng_log_index-$tomorrow |wc -l`
    if [ "$response" == 1 ];then
        break
    else
            continue
    fi
    done;
    

    这里有几个es优化的选项

    "index.codec":"best_compression" #通过配置来把lucene适用的压缩算法替换成 DEFLATE,提高数据压缩率。
    "translog": {
            "sync_interval": "60s",
            "durability": "async",
            "flush_threshold_size": "1024mb"
          }  
    #通过上述配置调整 translog 持久化策略为异步周期性执行,并适当调整translog的刷盘周期
    "refresh_interval": "10s" #refresh的时间间隔,减少segment的merge操作
     "_all": { "enabled": "false"} #日志这块对字段都进行了定义,并且都通all字段查询的情况较少,因此关闭all字段
    

    gc这块的信息可以通过直接消费kafka中的数据来定制化自己的gc信息告警图

    fluentd插件镜像配置

    FROM fluent/fluentd:v1.2
    #增加es插件
    RUN  fluent-gem install fluent-plugin-elasticsearch
    #增加kafka插伯
    RUN  fluent-gem install fluent-plugin-kafka
    CMD exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT
    
    

    日志查询

    查询服务调用关系

    通过sessionTid来查询服务间的调用关系,这里sessionTid正是上面MDC中设置的,在服务的调用中通过InvocationContext(dapeng上下文)进行传递

    服务调用关系.png

    查询堆栈异常

    堆栈异常.png

    按天进行错误分组

    GET dapeng_log_index-2018.07.25/_search
    {
      "size": 0,
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "level": "ERROR"
              }
            }
          ],
          "filter": {
            "script": {
              "script": {
                "source": "doc['message'].values.length==0"
              }
            }
          }
        }
      },
      "aggs": {
        
        "group_by_tag": {
         
          "terms": {
            "field": "tag",
            "size": 100
          }
        }
      }
    }
    

    坑及优化

    fluent-bit报Invalid indentation level
    fluent-bit对配置文件的要求比较高,请保持配置用空格对齐,不要使用tab键

    fluent-bit高内存占用
    根据官方文档描述,在某些环境中,通常会发现被摄取的日志或数据比将其刷新到某些目的地的速度要快。 常见的情况是从大日志文件读取并通过网络将日志分派到后端,这需要一些时间来响应,这样会产生背压,导致服务中的高内存消耗。为了避免背压,Fluent Bit在引擎中实现了一种限制数据量的机制,通过配置参数Mem_Buf_Limit完成的。

    我们这里通过配置Mem_Buf_Limit来优化,另外fluent-bit默认使用Glibc来管理分配内存,这里我们使用jmalloc,这是一种替代内存分配器,它具有更好的策略来减少其他碎片以获得更好的性能

    image

    fluentd隔天写入索引
    写入es中的日志会比当前时间提前8个小时,例如0-8点的日志会写入到昨天的索引中,这里我们配置utc-index为false即可

    elasticsearch长期报GC
    由于业务高峰日志量导致瞬时写入较大,es会长时间报gc,影响数据的写入,这里我们引入kafka作消息缓冲,另外我们弃用elasticsearch默认的垃圾回收器,使用G1回收器

    jdb2高io使用
    最开始,我们在网站上检索关于jdb2高iowait的解决方案,给出的方案都是ext4的bug,差一点我就信了,linux的bug也能遇到,但是转过来一想这bug也好多年了,内核早就修复了,应该不是这方面的问题,我们使用top查看cpu的使用情况,比较空闲,但是wait比较高

    image

    使用iotop来查看磁盘的io使用情况,基本都是fluent-bit产生的

    image

    接下来我们使用 blktrace来收集更进一步的详细信息

    image

    最后我们使用wc来统计43这一秒内fluent-bit产生的IO请求数(Q表示即将生成IO请求)

    image

    问题元凶找到了,fluent-bit读取的日志文件后会在写出的时候更新文件位置索引,将索引保存在sqllite中,根据上面的统计,每秒钟产生的IO操作在101次(由于有4个fluent-bit)正是由于fluent-bit频繁的更新sqlite中的文件索引,造成文件合并引起的高iowait,因此需要对sqlite的写入次数加限制,这里我们基于fluent-bit改造了两种 方案,第一种,每次都只从尾部读取文件,这样就省掉了文件索引的保存达到减少磁盘IO,第二种,增加db_count参数用于对chunk块计数,当发送chunk块计数达到配置的参数时保存文件的位置索引,我们dapeng对这两块都进行了个性化改造实现,改造后的效果对比图如下

    image

    基于日志系统的衍生扩展

    目前我们基于现有的日志系统,做了生产故障实时告警系统,直接钉钉推送给相关的业务系统负责人,具体方案有两种,一种是根据索引去过滤近30分钟的日志异常推送,另外一种是从kafak中提取消息后过滤推送,第一种是假实时,错误有所延迟,第二种是完全实时,第一种方案和第二种方案我们在生产环境中都有推广,目前第二种我们也正在开源进行中,敬请期待

    image

    总结

    到这一步,我们的日志系统已经搭建成功了,当服务器扩容时,由于fluent-bit是集成在dapeng容器中,只需要在环境变量中简单配置serviceName和hostname以及fluentdhost即可,日志消息就会写入到es存储中。

    日志系统是一个非常重要的功能组成部分,我们可以使用日志系统来进行错误编排,系统优化,根据这些信息调整系统的行为,提高系统的可用性。(想了解更多?请关注dapeng开源)

    相关文章

      网友评论

        本文标题:dapeng日志的收集处理及查询应用

        本文链接:https://www.haomeiwen.com/subject/paoxvftx.html