美文网首页
ELK日志分析系统

ELK日志分析系统

作者: ccodle | 来源:发表于2021-05-19 17:15 被阅读0次

    最近在学习java相关的知识,简单记录一下构建ELK日志分析系统的过程。

    环境介绍

    • Ubuntu 20.04
    • Kafka 2.8.0
    • FileBeat 7.12.1
    • Logstash 7.12.1
    • ElasticSearch 7.12.1
    • Kibana 7.12.1
    • web.jar 用于产生所要记录的日志,基于SpringBoot构建
      其中:Kafka ,FileBeat ,Logstash 用于实现日志数据的抓取,过滤,ElasticSearch和Kibana 用于实现数据的展示。每个组件的此处不详细展开,主要我也不咋明白来着~~
    整体架构图(图片来源于慕课网)
    • image.png
    • image.png
    组件安装简介
    Kafka安装与配置

    1.官网下载想要的版本,此处使用的是Linux,所以下载Linux版本

    # 解压缩
    tar -zxvf kafka_2.13-2.8.0.tgz
    

    2.配置Kafka

    • config/server.properties
    cd config/
    vim server.properties
    ## 需要修改的项
    listeners=PLAINTEXT://127.0.0.1:9092
    advertised.listeners=PLAINTEXT://127.0.0.1:9092
    log.dirs=/usr/elk/kafka_2.13-2.8.0/data # 需要先创建data目录,用于实现数据和日志文件的分离
    ## 需要修改的项
    
    1. 启动
    # 先启动zookeeper
    bin/zookeeper-server-start.sh config/zookeeper.properties &
    # 启动kafka
    bin/kafka-server-start.sh config/server.properties &
    
    1. 创建topic
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1  
    
    1. 查看topic信息
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
    kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic app-log-collector --describe
    

    注意:上述命令的 bin 不是必须的,主要依据执行命令时所处的文件夹路径而定。

    FileBeat安装配置

    1.下载并解压文件
    2.修改filebeat.yml配置文件,需要注意缩进问题

    filebeat.inputs:
           
    - input_type: log 
      
      paths:
        ##app-服务名称.log,为什么写死,防止发生轮转抓取历史数据
        - /usr/elk/logs/app-collector.log ##定义写入ES时的type值
        ##此处的log文件位置需要注意,是web.jar产生的文件的名称和所在的位置
      document_type: "app-log"
      multiline:
        #pattern:'^\s*(\d{4}l\d{2})\-(\d{2}l[a-zA-Z]{3})\-(\d{2}l\d{4})
        pattern: '^\['  # 指定匹配的表达式
        negate: true        # 是否匹配到
        match: after        #合并到上一行的末尾
        timeout: 2s     #如果在规定时间没有新的日志事件就不等待后面的日志
        max_lines: 2000 
      fields:
        logbiz: collector
        logtopic: app-log-collector ##按服务划分用作kafka topic evn:dev
        evn: dev
    
    - input_type: log
      paths:
        - /usr/elk/logs/error-collector.log
        ##此处的log文件位置需要注意,是web.jar产生的文件的名称和所在的位置
      document_type: "error-log"
      multiline: 
      #pattern:1\s*(\d{4}l\d{2})\-(\d{2}l[a-zA-Z]{3})\-(\d{2}l\d{4})'#指定匹配的表达式(匹
        pattern: '^\['  #指定匹配的表达式(匹配以”(开头的字符串)
        negate: true    #是否匹配到
        match: after        #合并到上一行的末尾
        max_lines: 2000 #最大的行数
        timeout: 2s #如果在规定时间没有新的日志事件就不等待后面的
      fields:
        logbiz: collector
        logtopic: error-log-collector   ##按服务划分用作kafka topic
        evn: dev
    
    output.kafka: 
      enabled: true
      hosts: ["127.0.0.1:9092"] 
      topic: '%{[fields.logtopic]}'
      partition.hash: 
        reachable_only: true 
        compression: gzip 
        max_message_bytes: 1000000
        required_acks: 1
    logging.to_files: true  
    
    

    3.启动FileBeat

    ./filebeat &
    
    Logstash安装配置

    1.下载并解压文件
    2.修改config下的配置文件

    - config下配置文件说明:
    - logstash配置文件:/config/logstash.yml
    - JVM参数文件:/config/jvm.options
    - 日志格式配置文件:log4j2.properties
    - 制作Linux服务参数:/config/startup.options
    

    修改logstash.yml,增加workers工作线程数 可以有效的提升logstash性能

    pipeline.workers: 16
    

    3.logstash根目录下创建script文件夹,创建logstash-script.conf文件

    mkdir script
    cd script
    vim logstash-script.conf
    

    logstash-script.conf 文件内容

    ##multiline插件也可以用于其他类似的堆栈式信息,比如linux的内核日志。
    input{
      kafka{
        ##app-log-服务名称
        topics_pattern => "app-log-.*"
        bootstrap_servers => "127.0.0.1:9092"
        codec => json
        consumer_threads => 4   ##增加consumer的并行消费线程数decorate events =>true
        decorate_events => true
        #auto_offset_rest =>"latest"
        group_id => "app-log-group"
      }
    
      kafka{
        ##error-log-服务名称
        topics_pattern => "error-log-.*"
        bootstrap_servers => "127.0.0.1:9092"
        codec => json 
        consumer_threads => 4
        decorate_events => true
        #auto_offset_rest =>"latest"
        group_id => "error-log-group"
      }
    }
    filter{
      #时区转换
      ruby{
        code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
      }
        if "app-log" in [fields][logtopic]{
        grok{
        ##表达式
        match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
            }
        }
            
        if "error-log" in[fields][logtopic]{
        grok{
          ##表达式
          match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
        }
    }
    ##测试输出到控制台:
    output{
     stdout { codec => rubydebug }
    }
    ##elasticsearch:
    output{
      if "app-log" in [fields][logtopic]{
      ##es插件
      elasticsearch{
        #es服务地址
        hosts => ["127.0.0.1:9200"]
        #用户名密码
        #user => "elastic"
        #password => "123456"
        ## 索引名,+号开头的,就会自动认为后面是时间格式:
        ## javalog-app-service-2019.01.23
        index => "app-log-%{[fields][logbiz]}-%{index_time}"
        # 是否嗅探集群ip:一般设置true;http://10.130.159.166:9200/_nodes/http?pretty
        # 通过嗅探机制进行es集群负载均衡发日志消息
        sniffing => true
        #1ogstash默认自带一个mapping模板,进行模板覆盖
        template_overwrite => true
      }
      }
      if "error-log" in [fields][logtopic]{
      elasticsearch {
        hosts => ["127.0.0.1:9200"]
        #user => "elastic"
        #password => "123456"
        index => "error-log-%{[fields][logbiz]}-%{index_time}"
        sniffing=>true 
        template_overwrite => true
      }
      }
    }
    

    4.启动Logstash

    bin/logstash -f /script/logstash-script.conf &
    
    ElasticSearch安装配置

    1.下载并解压文件
    2.修改 config/elasticsearch.yml 配置文件

    cluster.name: ccodle
    node.name: es-node-1    ## es-node-2  es-node-3 不同节点名称不同
    path.data: /usr/elk/elasticsearch/data  ## es数据存放位置
    path.logs: /usr/elk/elasticsearch/logs  ## es日志存放位置
    # bootstrap.memory_lock: true     ## 锁内存,强制占用(类似oracle的锁内存)保证es启动正常
    network.host: 0.0.0.0     ## 绑定所有ip
    discovery.seed_hosts: ["127.0.0.1"]
    cluster.initial_master_nodes: ["node-1"]
    # 用于elasticsearch-head插件的访问
    http.cors.enabled: true #表示是否支持跨域,默认为false
    http.cors.allow-origin: "*" #当设置允许跨域,默认为*,表示支持所有域名
    
    1. 用非 root 用户启动elasticsearch,启动可能会报线程限制的错误,需要修改 /etc/security/limits.conf
    # 文件末尾追加
    * soft nofile 65536
    * hard nofile 131072
    * soft nproc 4096
    * hard nproc 4096
    
    # 启动命令
    bin/elasticsearch &
    

    4.记得安装ik分词器,此处不再给出步骤

    Kibana安装配置

    1.下载并解压文件
    2.修改 config/kibana.yml 配置文件

    server.host: "0.0.0.0"
    server.name: "127.0.0.1"
    elasticsearch.hosts: ["http://127.0.0.1:9200"]
    

    3.启动Kibana

    bin/kibana &
    
    启动web.jar
    java -jar web.jar
    

    没有意外的话,登录Kibana的界面(localhost:5601)就能看到相应的日志数据了。

    相关文章

      网友评论

          本文标题:ELK日志分析系统

          本文链接:https://www.haomeiwen.com/subject/csbpjltx.html