美文网首页用ELK处理日志
ELK +kafka作为日志和简单数据处理

ELK +kafka作为日志和简单数据处理

作者: wuffy | 来源:发表于2018-11-09 15:32 被阅读0次

    ELK +kafka作为日志和简单数据处理


    1.0 使用背景介绍

    使用背景:让服务每天产生大量的日志文件数据对公司业务做一部分贡献,基于日志数据做简单的数据加工用于替 代现在通过SQL视图方式的统计,基于错误日志的简单报警,和接口性能统计等
    Kafka Stream 做简单的流式处理尝试,ELK的版本6.6.6, kafka 版本 2.1.0
    使用kafka可以灵活扩展结束其他数据处理框架

    2.0 流程示例图
    ELK技术栈.png

    3.0 Filebeat

    用于收集各个服务器上的日志,filebeat是Beat家族的一部分,用户读取日志文件,可以直接发送到Elasticsearch,Logstash或者其他消息队列,filebeat是一个轻量级的日志代理,占用服务资源很少,新版本有"压敏传感器"可以根据流量自动调节传送大小,详见说明参考官方文档

    3.1 下载安装

    地址 :[https://www.elastic.co/downloads/beats/filebeat]选择版本和安装包的方式,个人选择使用压缩包的方式,下载完后解压可以找到名为filebeat.yml的配置文件

    filebeat.prospectors:
    - type: log
      paths:
      - /data/wwwlogs/*.log
      tail_files: true
      ignore_older: 2h
      fields:
        logSource: nginx-test
      fields_under_root: true
    processors:
      - drop_fields:
          fields: ["host","beat","prospector","log","input","offset"]
    output.file:
      enabled: false
      path: "/tmp/filebeat"
      filename: filebeat
    output.kafka:
      enabled: true
      hosts: ["host:port"]
      version: 2.0.0
      topic: 'filebeat_nginx_test'
      partition.round_robin:
        reachable_only: false
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000
    #------log--------------------------
    logging.level: info
    logging.to_files: true
    logging.to_syslog: false
    logging.files:
      path: /home/filebeat/logfilebeat/
      keepfiles: 4
      name: mybeat.log
    

    4.0 Logstash 配置文件,具体配置参考 https://www.elastic.co/guide/index.html 中 input-kafka介绍

    input{
          kafka{
            bootstrap_servers => ["10.0.20.116:9092"]
            client_id => "kafaka-test"
            group_id => "kafaka-test"
            auto_offset_reset => "latest"
            consumer_threads => 5
            decorate_events => true
            topics => ["filebeat_nginx_test_01"]
            type => "nginx"
            codec => "json"
          }
    }
    
    filter {
              mutate {
                    add_field => {
                        "kafka" => "%{[@metadata][kafka]}"
                    }
                }
    }
    
    output {
         elasticsearch {
             hosts => "host:port"
             index => "yourindex-%{+YYYY.MM}"
         }
        kafka {
            bootstrap_servers => "host:port"
            topic_id => "nginx-access-log"
            compression_type => "snappy"
      }
    }
    
    

    相关文章

      网友评论

        本文标题:ELK +kafka作为日志和简单数据处理

        本文链接:https://www.haomeiwen.com/subject/pmmixqtx.html