ELK+Beats

作者: 尛尛大尹 | 来源:发表于2017-12-14 11:14 被阅读0次

    1、搜索引擎简介:

        索引组件:获取数据-->建立文档-->文档分析-->文档索引(倒排索引)
        搜索组件:用户搜索接口-->建立查询(将用户键入的信息转换为可处理的查询对象)-->搜索查询-->展现结果
        
        索引组件:Lucene
        搜索组件:Solr, ElasticSearch
    

    ElasticSearch:

    是一种分布式的、可重复使用的搜索和分析引擎,能够解决越来越多
    的用例。作为弹性堆栈的核心,它集中地存储你的数据,这样你就可
    以发现预期和发现意外情况。 
    

    Logstash:

    Logstash是一个开源的服务器端数据处理管道,它同时从多个源获取
    数据,对其进行转换,然后将其发送到您最喜欢的“stash”(当然,我
    们的是Elasticsearch)。
    

    Beats:

    搜集和分析日志的工具,比logstash消耗更少的资源
    Beats一共分为六种:
        Filebeat:主要用于收集日志数据。
        Metricbeat:进行指标采集,主要用于监控系统和软件的性能。
        Packetbeat:过网络抓包、协议分析,对一些请求响应式的系统通信进行监控和数据收集,可以收集到很多常规方式无法收集到的信息。           
            Winlogbeat:针对windows日志搜集      
            Heartbeat:系统间连通性检测,比如 icmp, tcp, http 等系统的连通性监控。
    

    Kibana:

          把搜索得到的结果可视化   
    

    2.安装

    主机版本centos7,ELK6版本的安装包,ELK、Beats版本最好一致
    主机:

    三台机器都安装Elasticsearch,创建集群
    10.10.10.1  Elasticsearch(master),Kibana
    10.10.10.2  Elasticsearch(data-node1),logstash
    10.10.10.3  Elasticsearch(data-node2),Beats
    

    3、ElasticSearch的程序环境:

    安装:

    rpm -ivh elasticsearch-x.x.x.prm
    
    配置文件:
        /etc/elasticsearch/elasticsearch.yml  主配置文件
        /etc/elasticsearch/jvm.options   jvm参数配置文件
        /etc/elasticsearch/log4j2.properties   日志配置文件
    程序文件:
        /usr/share/elasticsearch/bin/elasticsearch
        /usr/share/elasticsearch/bin/elasticsearch-keystore:
    

    端口:

    搜索服务:9200/tcp
    集群服务:9300/tcp
    

    修改配置文件/etc/elasticsearch/elasticsearch.yml:

    cluster.name: myelk #集群的名字,必须一致
    node.name: mater-node  #这个节点的名字
    path.data: /data/els/data # 数据存储位置
    path.logs: /data/els/logs #日志存储位置
    network.host: 0.0.0.0 #监听地址,0.0.0.0监听所有
    http.port: 9200   #对外开放的端口
    discovery.zen.ping.unicast.hosts: ["node1", "node2", "node3"] #主节点的初始列表,当主节点启动时会探测其他节点      
    

    启动elasticsearch:

    systemctl start elasticsearch
    

    查看进程和端口:

    ps -aux|grep elasticsearch
    netstat -lntp 
    

    RESTful API:

    curl  -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'
        <BODY>:json格式的请求主体;
        <VERB>:GET,POST,PUT,DELETE
        <PATH>:/index_name/type/Document_ID/
                特殊PATH:/_cat, /_search, /_cluster 
     
               /_search:搜索所有的索引和类型;
                /INDEX_NAME/_search:搜索指定的单个索引;
                /INDEX1,INDEX2/_search:搜索指定的多个索引;
                /s*/_search:搜索所有以s开头的索引;
                /INDEX_NAME/TYPE_NAME/_search:搜索指定的单个索引的指定类型;
    
    
    curl -XGET 'http://10.1.0.67:9200/_cluster/health?pretty=true'
    curl -XGET 'http://10.1.0.67:9200/_cluster/stats?pretty=true'   
    curl -XGET 'http://10.1.0.67:9200/_cat/nodes?pretty'
    curl -XGET 'http://10.1.0.67:9200/_cat/health?pretty'
    curl -XGET 'http://10.1.0.67:9200/_cat/indices?v'   
    

    6、logstash

    安装logstash同上
    logstash配置:

    主配置文件时logstash.yml
    path.data: /var/lib/logstash
    http.host: "10.10.10.2"
    http.port: 9600
    path.logs: /var/log/logstash
    

    处理具体日志文件,配置在/etc/logstash/conf.d目录下,并以.conf结尾

            input { 输入
                ...
            }
            
            filter{ 过滤
                ...
            }
            
            output { 输出
                ...
            }
            
    

    简单示例配置:

    示例1:
           input {
                stdin {}
           }
          output {
                stdout {
                codec => rubydebug 显示在当前屏幕上
                }
           }
    

    示例2:从文件输入数据,经grok过滤器插件过滤之后输出至标准输出:

                input {
                    file {
                        path => ["/var/log/httpd/access_log"]
                        start_position => "beginning" 从头开始读取
                    }
                }
    
                filter {
                    grok {
                        match => {
                            "message" => "%{COMBINEDAPACHELOG}"
                        }
                        remove_field: "message"
                    }
                }
    
                output {
                    stdout {
                        codec => rubydebug
                    }
                }
    

    示例3:date filter插件示例:

    filter {
              grok {
                  match => {  "message" => "%{HTTPD_COMBINEDLOG}"
                       }
              remove_field => "message"
              } 
             date {
                   match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                   remove_field => "timestamp"
                    }
             }              
    

    示例4:mutate filter插件

              filter {
                       grok {
                               match => {
                                       "message" => "%{HTTPD_COMBINEDLOG}"
                               }
                       }
                       date {
                               match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                       }
                       mutate {
                               rename => {
                                       "agent" => "user_agent"
                               }
                       }
               } 
    

    示例5:geoip插件

       filter {
             grok {
                   match => { "message" => "%{HTTPD_COMBINEDLOG}"
                    }
                 }
              date {
                   match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                      }
              mutate {
                   rename => { "agent" => "user_agent"
                      }
                   }
               geoip {
                     source => "clientip"
                      target => "geoip"
                      database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
                      }
                }            
    

    示例6:使用Redis

    (1) 从redis加载数据
                    input {
                        redis {
                            batch_count => 1
                            data_type => "list"
                            key => "logstash-list"
                            host => "192.168.0.2"
                            port => 6379
                            threads => 5
                        }
                    } 
                
    (2) 将数据存入redis
                    output {
                        redis {
                            data_type => "channel"
                            key => "logstash-%{+yyyy.MM.dd}"
                        }
                    } 
    

    示例7:将数据写入els cluster

    output {
                    elasticsearch {
                        hosts => ["http://node1:9200/","http://node2:9200/","http://node3:9200/"]
                        user => "ec18487808b6908009d3"
                        password => "efcec6a1e0"
                        index => "logstash-%{+YYYY.MM.dd}"
                        document_type => "apache_logs"
                    }
                }        
    

    示例8:综合示例,启用geoip

     input {
             beats {
                 port => 5044
                     }
             }
    
    filter {
              grok {
                   match => { 
                        "message" => "%{COMBINEDAPACHELOG}"
                        }
                        remove_field => "message"
                    }
                    geoip {
                        source => "clientip"
                        target => "geoip"
                        database => "/etc/logstash/GeoLite2-City.mmdb"
                    }
                }
    output {
                  elasticsearch {
                   hosts => ["http://172.16.0.67:9200","http://172.16.0.68:9200","http://172.16.0.69:9200"]
                    index => "logstash-%{+YYYY.MM.dd}"
                    action => "index"
                    document_type => "apache_logs"
                    }
                }        
    

    grok:

            %{SYNTAX:SEMANTIC}
                SYNTAX:预定义的模式名称;
                SEMANTIC:给模式匹配到的文本所定义的键名;
                
                1.2.3.4 GET /logo.jpg  203 0.12
                %{IP:clientip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
                
                { clientip: 1.2.3.4, method: GET, request: /logo.jpg, bytes: 203, duration: 0.12}
                
                
                %{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)" %{HOST:domain} %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} "(%{WORD:x_forword}|-)" (%{URIHOST:upstream_host}|-) %{NUMBER:upstream_response} (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}) > (%{BASE16FLOAT:request_time})
                
                 "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
                 
                 filter {
                    grok {
                        match => {
                            "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
                        }
                        remote_field: message
                    }   
                }
                
                nginx.remote.ip
                [nginx][remote][ip] 
                
                
                filter {
                    grok {
                        match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx
                        ][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\
                        " %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"
                        %{DATA:[nginx][access][agent]}\""] }
                        remove_field => "message"
                    }  
                    date {
                        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
                        remove_field => "[nginx][access][time]"
                    }  
                    useragent {
                        source => "[nginx][access][agent]"
                        target => "[nginx][access][user_agent]"
                        remove_field => "[nginx][access][agent]"
                    }  
                    geoip {
                        source => "[nginx][access][remote_ip]"
                        target => "geoip"
                        database => "/etc/logstash/GeoLite2-City.mmdb"
                    }  
                                                                    
                }   
                
                output {                                                                                                     
                    elasticsearch {                                                                                      
                        hosts => ["node1:9200","node2:9200","node3:9200"]                                            
                        index => "logstash-ngxaccesslog-%{+YYYY.MM.dd}"                                              
                    }                                                                                                    
                }
                
                注意:
                    1、输出的日志文件名必须以“logstash-”开头,方可将geoip.location的type自动设定为"geo_point";
                    2、target => "geoip"
                
        除了使用grok filter plugin实现日志输出json化之外,还可以直接配置服务输出为json格式;
                
                
        示例:使用grok结构化nginx访问日志 
            filter {
                    grok {
                            match => {
                                    "message" => "%{HTTPD_COMBINEDLOG} \"%{DATA:realclient}\""
                            }
                            remove_field => "message"
                    }
                    date {
                            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                            remove_field => "timestamp"
                    }
            }            
                
        示例:使用grok结构化tomcat访问日志 
            filter {
                    grok {
                            match => {
                                    "message" => "%{HTTPD_COMMONLOG}"
                            }
                            remove_field => "message"
                    }
                    date {
                            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                            remove_field => "timestamp"
                    }
    

    8、 Nginx日志Json化:

            log_format   json  '{"@timestamp":"$time_iso8601",'
                        '"@source":"$server_addr",'
                        '"@nginx_fields":{'
                            '"client":"$remote_addr",'
                            '"size":$body_bytes_sent,'
                            '"responsetime":"$request_time",'
                            '"upstreamtime":"$upstream_response_time",'
                            '"upstreamaddr":"$upstream_addr",'
                            '"request_method":"$request_method",'
                            '"domain":"$host",'
                            '"url":"$uri",'
                            '"http_user_agent":"$http_user_agent",'
                            '"status":$status,'
                            '"x_forwarded_for":"$http_x_forwarded_for"'
                        '}'
                    '}';
    
            access_log  logs/access.log  json;              
    
    Conditionals
        Sometimes you only want to filter or output an event under certain conditions. For that, you can use a conditional.
    
        Conditionals in Logstash look and act the same way they do in programming languages. Conditionals support if, else if and else statements and can be nested.
        
        The conditional syntax is:
    
            if EXPRESSION {
            ...
            } else if EXPRESSION {
            ...
            } else {
            ...
            }    
            
            What’s an expression? Comparison tests, boolean logic, and so on!
    
            You can use the following comparison operators:
    
            equality: ==, !=, <, >, <=, >=
            regexp: =~, !~ (checks a pattern on the right against a string value on the left) inclusion: in, not in
            
            The supported boolean operators are:
    
                and, or, nand, xor
            
            The supported unary operators are:
    
                !
            Expressions can be long and complex. Expressions can contain other expressions, you can negate expressions with !, and you can group them with parentheses (...).
            
            filter {
            
                if [type] == 'tomcat-accesslog' {
                    grok {}
                }
                
                if [type] == 'httpd-accesslog' {
                    grok {}
                }
    }
    

    相关文章

      网友评论

          本文标题:ELK+Beats

          本文链接:https://www.haomeiwen.com/subject/lighwxtx.html