美文网首页
搭建ELK日志中心

搭建ELK日志中心

作者: cnwinds | 来源:发表于2020-03-24 21:28 被阅读0次

    概述

    架构图

    使用容器跑业务系统后,将面临日志碎片化的问题。通过ELK可以将日志统一收集到一起进行,将极大方便日志的分析。
    下面就开始介绍怎样搭建ELK。下面是一个最小系统的搭建,实际可以根据需求进行扩展。

    配置Linux系统rsyslog服务

    编辑文件

    vi /etc/rsyslog.conf
    

    增加以下行。将所有的日志都转发到4560端口,这是后面要启动的logstash服务端口。

    *.* @@localhost:4560
    

    重启rsyslog服务器让配置生效。

    service rsyslog restart
    

    启动ELK

    1. 创建目录
    mkdir -p /home/elk/esdata
    
    1. 创建文件 /home/elk/elk.yaml
    version: '2'
    services:
      kibana:
        image: kibana:7.6.2
        network_mode: host
        environment:
          ELASTICSEARCH_HOSTS: http://127.0.0.1:9200
      elastic:
        image: elasticsearch:7.6.2
        network_mode: host
        ports:
        - '9200:9200'
        user: "101"
        volumes:
        - /srv/elk/es1.yml:/usr/share/elasticsearch/config/elasticsearch.yml
        - /srv/elk/esdata:/usr/share/elasticsearch/data
        - /usr/share/zoneinfo/Asia/Shanghai:/etc/localtime
    
    1. 创建文件/home/elk/es1.yml
    cluster.name: elasticsearch-cluster
    node.name: node1
    network.bind_host: 0.0.0.0
    network.publish_host: 101.71.88.66
    http.port: 9200
    #cluster.initial_master_nodes: ["node-1"]
    discovery.type: single-node
    #transport.tcp.port: 9300
    #http.cors.enabled: true
    #http.cors.allow-origin: "*"
    #node.master: true
    #node.data: true
    #discovery.zen.ping.unicast.hosts: ["192.168.9.219:9300","192.168.9.219:9301","192.168.9.219:9302"]
    #discovery.zen.minimum_master_nodes: 2
    xpack.license.self_generated.type: basic
    xpack.security.enabled: false
    xpack.security.transport.ssl.enabled: false
    
    1. 创建文件 /home/elk/logstash.yaml
    version: '2'
    services:
      logstash:
        image: logstash
        network_mode: host
        ports:
        - '4560:4560'
        volumes:
        - /home/elk/logstash.conf:/etc/logstash.conf
        command: -f /etc/logstash.conf
        - /usr/share/zoneinfo/Asia/Shanghai:/etc/localtime
    
    1. 创建文件 /home/elk/logstash.conf
    input {
      syslog {
        type => "rsyslog"
        port => 4560
      }
    }
    
    output {
      elasticsearch {
        hosts => [ "127.0.0.1:9200" ]
      }
    }
    
    1. 启动ELK
    cd /home/elk
    docker-compose -f elk.yaml up -d
    docker-compose -f logstash.yaml up -d
    

    访问kibana

    将127.0.0.1换成你机器的外网IP。ELK已经搭建完成。

    http://127.0.0.1:5601
    

    kibana展示数据的时区问题

    在采用ELK记录系统日志时,日志存入elasticsearch时,一般是以本地时区存入(如北京东8区)
    在elasticsearch中直接查询时也没有任何问题,但是kibana在做日志展示时,对日志时间类型会进行一次格式化,如下说明:
    dateFormat:tz (Default: Browser)
    Which timezone should be used. "Browser" will use the timezone detected by your browser.
    默认将通过浏览器获取当前系统时区,并在日志时间上加上这个时区,比如东8区,将在时间上加8小时再展示。
    而elasticsearch中的日志时间已经经过了时区处理,因此这里的默认处理方式,对以UTC时间记录的日志是正确的,而以本地时区时间记录的日志处理是错误的。
    针对这个问题,可以在kibana的setting->advance中,将dateFormat:tz设置改为UTC时区即可。

    附录

    logstash

    logstash工作原理

    logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。
    从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)。

    input

    file 从文件读取数据

    file{
        path => ['/var/log/nginx/access.log']  #要输入的文件路径
        type => 'nginx_access_log'
        start_position => "beginning"
    }
    # path  可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log则是/var/log/*.log
    # type 通用选项. 用于激活过滤器
    # start_position 选择logstash开始读取文件的位置,begining或者end。
    还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网
    

    syslog 通过网络将系统日志消息读取为事件

    syslog{
        port =>"514" 
        type => "syslog"
    }
    # port 指定监听端口(同时建立TCP/UDP的514端口的监听)
    
    #从syslogs读取需要实现配置rsyslog:
    # cat /etc/rsyslog.conf   加入一行
    *.* @172.17.128.200:514   #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取
    # service rsyslog restart   #重启日志服务
    

    beats 从Elastic beats接收事件

    beats {
        port => 5044   #要监听的端口
    }
    # 还有host等选项
    
    # 从beat读取需要先配置beat端,从beat输出到logstash。
    # vim /etc/filebeat/filebeat.yml 
    ..........
    output.logstash:
    hosts: ["localhost:5044"]
    

    kafka 将 kafka topic 中的数据读取为事件

    kafka{
        bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
        topics => ["access_log"]
        group_id => "logstash-file"
        codec => "json"
    }
    
    kafka{
        bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
        topics => ["weixin_log","user_log"]  
        codec => "json"
    }
    
    # bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。
    # topics  要订阅的主题列表,kafka topics
    # group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id
    # codec 通用选项,用于输入数据的编解码器。
    

    filter plugin 过滤器插件,对事件执行中间处理

    grok 解析文本并构造 。把非结构化日志数据通过正则解析成结构化和可查询化

    grok {
                match => {"message"=>"^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$"}
            }
    匹配nginx日志
    # 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7"
    #220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    
    • 注意这里grok 可以有多个match匹配规则,如果前面的匹配失败可以使用后面的继续匹配。例如
    grok {
                match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
                match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]       
            }
    
      grok 语法:%{SYNTAX:SEMANTIC}   即 %{正则:自定义字段名}
    
                     官方提供了很多正则的grok pattern可以直接使用  :[https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns](https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns)  
    
                     grok debug工具: http://grokdebug.herokuapp.com
    

    正则表达式调试工具: https://www.debuggex.com/

    正则学习文档:https://www.jb51.net/tools/zhengze.html

         自定义模式:   (?<字段名>the pattern)
    
        例如: 匹配 2018/06/27 14:00:54  
    
                (?<datetime>\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d)
    
          得到结果:  "datetime": "2018/06/27 14:00:54"
    
    • ************date 日期解析 解析字段中的日期,然后转存到@timestamp************
    [2018-07-04 17:43:35,503]
    grok{
         match => {"message"=>"%{DATA:raw_datetime}"}
    }
    date{
          match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
           remove_field =>["raw_datetime"]
    }
    
    #将raw_datetime存到@timestamp 然后删除raw_datetime
    
    #24/Jul/2018:18:15:05 +0800
    date {
         match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z]
    }
    
    #1565931281
       date{
           match => ["sql_timestamp","UNIX"]
           locale => "en"
       }
    
    • mutate 对字段做处理 重命名、删除、替换和修改字段。

    • **covert **类型转换。类型包括:integer,float,integer_eu,float_eu,string和boolean

    filter{
        mutate{
    #     covert => ["response","integer","bytes","float"]  #数组的类型转换
            convert => {"message"=>"integer"}
        }
    }
    #测试------->
    {
              "host" => "localhost",
           "message" => 123,    #没带“”,int类型
        "@timestamp" => 2018-06-26T02:51:08.651Z,
          "@version" => "1"
    }
    
    • **split ** 使用分隔符把字符串分割成数组
    mutate{
        split => {"message"=>","}
    }
    #---------->
    aaa,bbb
    {
        "@timestamp" => 2018-06-26T02:40:19.678Z,
          "@version" => "1",
              "host" => "localhost",
           "message" => [
            [0] "aaa",
            [1] "bbb"
        ]}
    192,128,1,100
    {
            "host" => "localhost",
         "message" => [
          [0] "192",
          [1] "128",
          [2] "1",
          [3] "100"
     ],
      "@timestamp" => 2018-06-26T02:45:17.877Z,
        "@version" => "1"
    }
    
    • merge 合并字段 。数组和字符串 ,字符串和字符串
    filter{
        mutate{
            add_field => {"field1"=>"value1"}
        }
        mutate{ 
              split => {"message"=>"."}   #把message字段按照.分割
        }
        mutate{
            merge => {"message"=>"field1"}   #将filed1字段加入到message字段
        }
    }
    #--------------->
    abc
    {
           "message" => [
            [0] "abc,"
            [1] "value1"
        ],
        "@timestamp" => 2018-06-26T03:38:57.114Z,
            "field1" => "value1",
          "@version" => "1",
              "host" => "localhost"
    }
    
    abc,.123
    {
           "message" => [
            [0] "abc,",
            [1] "123",
            [2] "value1"
        ],
        "@timestamp" => 2018-06-26T03:38:57.114Z,
            "field1" => "value1",
          "@version" => "1",
              "host" => "localhost"
    }
    
    • rename 对字段重命名
    filter{
        mutate{
            rename => {"message"=>"info"}
        }
    }
    #-------->
    {
        "@timestamp" => 2018-06-26T02:56:00.189Z,
              "info" => "123",
          "@version" => "1",
              "host" => "localhost"
    }
    
    • remove_field 移除字段
    mutate {
        remove_field => ["message","datetime"]
    }
    
    • ****join ****用分隔符连接数组,如果不是数组则不做处理
    mutate{
            split => {"message"=>":"}
    }
    mutate{
            join => {"message"=>","}
    }
    ------>
    abc:123
    {
        "@timestamp" => 2018-06-26T03:55:41.426Z,
           "message" => "abc,123",
              "host" => "localhost",
          "@version" => "1"
    }
    aa:cc
    {
        "@timestamp" => 2018-06-26T03:55:47.501Z,
           "message" => "aa,cc",
              "host" => "localhost",
          "@version" => "1"
    }
    
      • ****gsub ****用正则或者字符串替换字段值。仅对字符串有效
    mutate{
            gsub => ["message","/","_"]   #用_替换/
        }
    
    ------>
    a/b/c/
    {
          "@version" => "1",
           "message" => "a_b_c_",
              "host" => "localhost",
        "@timestamp" => 2018-06-26T06:20:10.811Z
    }
    
    *   **update**  更新字段。如果字段不存在,则不做处理
    
    mutate{
            add_field => {"field1"=>"value1"}
        }
        mutate{
            update => {"field1"=>"v1"}
            update => {"field2"=>"v2"}    #field2不存在 不做处理
        }
    ---------------->
    {
        "@timestamp" => 2018-06-26T06:26:28.870Z,
            "field1" => "v1",
              "host" => "localhost",
          "@version" => "1",
           "message" => "a"
    }
    
    *   **replace** 更新字段。如果字段不存在,则创建
    
    mutate{
            add_field => {"field1"=>"value1"}
        }
        mutate{
            replace => {"field1"=>"v1"}
            replace => {"field2"=>"v2"}
        }
    ---------------------->
    {
           "message" => "1",
              "host" => "localhost",
        "@timestamp" => 2018-06-26T06:28:09.915Z,
            "field2" => "v2",        #field2不存在,则新建
          "@version" => "1",
            "field1" => "v1"
    }
    
    • ****geoip ****根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息
            geoip {
                source => "clientip"
                database =>"/tmp/GeoLiteCity.dat"
            }
    
    • ******ruby ******ruby插件可以执行任意Ruby代码
    filter{
        urldecode{
            field => "message"
        }
        ruby {
            init => "@kname = ['url_path','url_arg']"
            code => " 
                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('?'))]) 
                event.append(new_event)"
        }
        if [url_arg]{
            kv{
                source => "url_arg"
                field_split => "&"
                target => "url_args"
                remove_field => ["url_arg","message"]
            }
        }
    }
    # ruby插件
    # 以?为分隔符,将request字段分成url_path和url_arg
    -------------------->
    www.test.com?test
    {
           "url_arg" => "test",
              "host" => "localhost",
          "url_path" => "www.test.com",
           "message" => "www.test.com?test",  
          "@version" => "1",
        "@timestamp" =>  2018-06-26T07:31:04.887Z
    }
    www.test.com?title=elk&content=学习elk
    {
          "url_args" => {
              "title" => "elk",
            "content" => "学习elk"
        },
              "host" => "localhost",
          "url_path" => "www.test.com",
          "@version" => "1",
        "@timestamp" =>  2018-06-26T07:33:54.507Z
    }
    
    • ********urldecode ********用于解码被编码的字段,可以解决URL中 中文乱码的问题
        urldecode{
            field => "message"
        }
    
    # field :指定urldecode过滤器要转码的字段,默认值是"message"
    # charset(缺省): 指定过滤器使用的编码.默认UTF-8
    
    • ********kv ********通过指定分隔符将字符串分割成key/value
    kv{
            prefix => "url_"   #给分割后的key加前缀
            target => "url_ags"    #将分割后的key-value放入指定字段
            source => "message"   #要分割的字段
            field_split => "&"    #指定分隔符
            remove_field => "message"
        }
    -------------------------->
    a=1&b=2&c=3
    {
                "host" => "localhost",
           "url_ags" => {
              "url_c" => "3",
              "url_a" => "1",
              "url_b" => "2"
        },
          "@version" => "1",
        "@timestamp" => 2018-06-26T07:07:24.557Z
    
    • useragent 添加有关用户代理(如系列,操作系统,版本和设备)的信息
    if [agent] != "-" {
      useragent {
        source => "agent"
        target => "ua"
        remove_field => "agent"
      }
    }
    # if语句,只有在agent字段不为空时才会使用该插件
    #source 为必填设置,目标字段
    #target 将useragent信息配置到ua字段中。如果不指定将存储在根目录中
    

    logstash 比较运算符

    等于: ==, !=, <, >, <=, >=
      正则: =~, !~ (checks a pattern on the right against a string value on the left)
      包含关系: in, not in

    支持的布尔运算符:and, or, nand, xor

    支持的一元运算符: !

    output plugin 输出插件,将事件发送到特定目标。

    stdout 标准输出。将事件输出到屏幕上

    output{
        stdout{
            codec => "rubydebug"
        }
    }
    

    file 将事件写入文件

        file {
           path => "/data/logstash/%{host}/{application}
           codec => line { format => "%{message}"} }
        }
    

    kafka 将事件发送到kafka

        kafka{
            bootstrap_servers => "localhost:9092"
            topic_id => "test_topic"  #必需的设置。生成消息的主题
        }
    

    elasticseach 在es中存储日志

        elasticsearch {
            hosts => "localhost:9200"
            index => "nginx-access-log-%{+YYYY.MM.dd}"  
        }
    #index 事件写入的索引。可以按照日志来创建索引,以便于删旧数据和按时间来搜索日志
    

    一个实际的例子。

    nginx日志采集

    input{
        file{
            path => "/var/log/nginx/access.log"
            start_position => "beginning"
            type => "nginx_access_log"
        }
    }
    filter{
        grok{
            match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"}
            match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""}
        }
        if [request] {
            urldecode {
                field => "request"
            }
           ruby {
               init => "@kname = ['url_path','url_arg']"
               code => "
                   new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
                   event.append(new_event)"
           }
            if [url_arg] {
                ruby {
                   init => "@kname = ['key', 'value']"
                   code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
                    }
            }
        }
        geoip{
            source => "clientip"
        }
        useragent{
            source => "user_agent"
            target => "ua"
            remove_field => "user_agent"
        }
        date {
            match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
            locale => "en"
        }
        mutate{
            remove_field => ["message","timestamp","request","url_arg"]
        }
    }
    output{
        elasticsearch {      
            hosts => "localhost:9200"
            index => "nginx-access-log-%{+YYYY.MM.dd}"   
        }
    }
    

    参考资料

    使用Filebeat收集线上docker日志

    相关文章

      网友评论

          本文标题:搭建ELK日志中心

          本文链接:https://www.haomeiwen.com/subject/wmpayhtx.html