美文网首页ELK文集
Logstash采集日志信息输出至ElasticSearch集群

Logstash采集日志信息输出至ElasticSearch集群

作者: a6fc544968bb | 来源:发表于2018-02-08 14:22 被阅读29次

      程序运行时日志内容繁多,格式复杂,当系统运行时出现不符合预期的或错误的结果时,怎样才能从运行时日志信息中快速提取系统中的重点数据信息,并对出现的错误进行归类统计,实时了解系统的运行状态?

      通过Logstash监控系统运行时日志文件,对所需关注的重点信息字段进行过滤提取,将日志文件中过滤提取的字段数据输出到ElasticSearch搜索引擎进行结构化存储,并通过与Spring data elasticsearch和Spring boot框架的集成,实现通过Java程序在Java Web项目界面统计查询ElasticSearch中保存的数据信息。

    实现过程:

    一、部署ElasticSearch集群

      ElasticSearch版本更新较快,如果需要与Spring boot + Spring data ElasticSearch进行集成,需注意Spring boot 和 Spring data ElasticSearch框架是否对所选版本的ElasticSearch提供了很好的支持。

      目前Spring boot + Spring Data ElasticSearch + ElasticSearch的版本对应关系

    
    Spring Boot Version (x)    Spring Data Elasticsearch Version (y)    Elasticsearch Version (z)
    
    x <= 1.3.5                  y <= 1.3.4                              z <= 1.7.2
    
    x >= 1.4.x                  2.0.0 <=y <5.0.0                        2.0.0 <= z < 5.0.0
    
    

      部署ElasticSearch可直接在ELK官网下载对应版本的源码,并解压到对应目录下。

    1、官网下载ElasticSearch、Logstash源代码,网站链接:
    https://www.elastic.co/cn/downloads/elasticsearch

      按照版本间依赖关系注意版本的选择。测试部署环境为Ubuntu 16.04LTS,此处选择ElasticSearch 1.7.6 TAR版本。

    下载ElasticSearch

    2、将解压后的ElasticSearch源代码文件上传拷贝至/opt目录下。

    解压上传ElasticSearch源码

    3、配置ElasticSearch插件。
    1)执行

    vi /opt/ElasticSearch-1.7.6/config/elasticsearch.yml
    

    编辑配置文件。配置文件详解如下:

    
    cluster.name: elasticsearch
    
    配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
    
    node.name: "Franz Kafka"
    
    节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,其中有很多作者添加的有趣名字。
    
    node.master: true
    
    指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。
    
    node.data: true
    
    指定该节点是否存储索引数据,默认为true。
    
    index.number_of_shards: 5
    
    设置默认索引分片个数,默认为5片。
    
    index.number_of_replicas: 1
    
    设置默认索引副本个数,默认为1个副本。
    
    path.conf: /path/to/conf
    
    设置配置文件的存储路径,默认是es根目录下的config文件夹。
    
    path.data: /path/to/data
    
    设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:
    
    path.data: /path/to/data1,/path/to/data2
    
    path.work: /path/to/work
    
    设置临时文件的存储路径,默认是es根目录下的work文件夹。
    
    path.logs: /path/to/logs
    
    设置日志文件的存储路径,默认是es根目录下的logs文件夹
    
    path.plugins: /path/to/plugins
    
    设置插件的存放路径,默认是es根目录下的plugins文件夹
    
    bootstrap.mlockall: true
    
    设置为true来锁住内存。因为当jvm开始swapping时es的效率 会降低,所以要保证它不swap,可以把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。 同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit -l unlimited`命令。
    
    network.bind_host: 192.168.0.1
    
    设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。
    
    network.publish_host: 192.168.0.1
    
    设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。
    
    network.host: 192.168.0.1
    
    这个参数是用来同时设置bind_host和publish_host上面两个参数。
    
    transport.tcp.port: 9300
    
    设置节点间交互的tcp端口,默认是9300。
    
    transport.tcp.compress: true
    
    设置是否压缩tcp传输时的数据,默认为false,不压缩。
    
    http.port: 9200
    
    设置对外服务的http端口,默认为9200。
    
    http.max_content_length: 100mb
    
    设置内容的最大容量,默认100mb
    
    http.enabled: false
    
    是否使用http协议对外提供服务,默认为true,开启。
    
    gateway.type: local
    
    gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器,其它文件系统的设置方法下次再详细说。
    
    gateway.recover_after_nodes: 1
    
    设置集群中N个节点启动时进行数据恢复,默认为1。
    
    gateway.recover_after_time: 5m
    
    设置初始化数据恢复进程的超时时间,默认是5分钟。
    
    gateway.expected_nodes: 2
    
    设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。
    
    cluster.routing.allocation.node_initial_primaries_recoveries: 4
    
    初始化数据恢复时,并发恢复线程的个数,默认为4。
    
    cluster.routing.allocation.node_concurrent_recoveries: 2
    
    添加删除节点或负载均衡时并发恢复线程的个数,默认为4。
    
    indices.recovery.max_size_per_sec: 0
    
    设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。
    
    indices.recovery.concurrent_streams: 5
    
    设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。
    
    discovery.zen.minimum_master_nodes: 1
    
    设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)
    
    discovery.zen.ping.timeout: 3s
    
    设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。
    
    discovery.zen.ping.multicast.enabled: false
    
    设置是否打开多播发现节点,默认是true。
    
    discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"]
    
    设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。
    
    下面是一些查询时的慢日志参数设置
    
    index.search.slowlog.level: TRACE
    
    index.search.slowlog.threshold.query.warn: 10s
    
    index.search.slowlog.threshold.query.info: 5s
    
    index.search.slowlog.threshold.query.debug: 2s
    
    index.search.slowlog.threshold.query.trace: 500ms
    
    index.search.slowlog.threshold.fetch.warn: 1s
    
    index.search.slowlog.threshold.fetch.info: 800ms
    
    index.search.slowlog.threshold.fetch.debug:500ms
    
    index.search.slowlog.threshold.fetch.trace: 200ms
    
    

    4、安装ElasticSearch-head插件,使用ElasticSearch视图界面管理ElasticSearch集群。

    1. 执行:
             cd /opt/ElasticSearch-1.7.6/bin,
            ./plugin -install mobz/ElasticSearch-head
    

    命令完成安装。

    2)进入ElasticSearch目录,执行

    bin/./elasticsearch
    

    启动ElasticSearch。

    启动ElasticSearch

    2)浏览器输入

    http://IP:9200/_plugin/head
    

    打开ElasticSearch视图管理界面,对ElasticSearch集群进行管理。

    ElasticSearch视图管理界面

    二、部署Logstash

    1、官网下载Logstash源代码,并考至/opt目录下。此处选择Logstash-2.4.1版本。

    2、编辑配置logstash.conf文件,配置日志采集、处理及输出所需插件。本文使用input插件监控指定目录下的日志文件,使用filter插件对日志内容进行处理,使用output插件将采集的日志信息输出到ElasticSearch集群进行管理。文件内容详解如下:

    
    input插件使用详解:
    
    An input plugin enables a specific source of events to be read by Logstash.
    
    path字段:指定所需监视日志文件的路径信息,可同时设置监视多个日志文件,详细使用方法描述如下:
    
    This is a required setting.
    
    Value type is [array](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#array)
    
    There is no default value for this setting.
    
    The path(s) to the file(s) to use as an input. You can use filename patterns here, such as /var/log/*.log. If you use a pattern like /var/log/**/*.log, a recursive search of /var/log will be done for all *.log files. Paths must be absolute and cannot be relative.
    
    You may also configure multiple paths. See an example on the [Logstash configuration page](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#array).
    
    type字段:指定信息文档类型,详细描述如下:
    
    Value type is [string](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#string)
    
    There is no default value for this setting.
    
    Add a type field to all events handled by this input.
    
    Types are used mainly for filter activation.
    
    The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.
    
    If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.
    
    start_position字段:
    
    Value can be any of: beginning, end
    
    Default value is "end"
    
    Choose where Logstash starts initially reading files: at the beginning or at the end. The default behavior treats files like live streams and thus starts at the end. If you have old data you want to import, set this to *beginning*.
    
    This option only modifies "first contact" situations where a file is new and not seen before, i.e. files that don’t have a current position recorded in a sincedb file read by Logstash. If a file has already been seen before, this option has no effect and the position recorded in the sincedb file will be used.
    
    codec字段:指定文档解码字符集
    
    Value type is [codec](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#codec)
    
    Default value is "plain"
    
    The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
    

    其他插件工具的作用及使用方法可查询官方文档进行获取,

    网站地址:https://www.elastic.co/guide/en/logstash/2.4/index.html

     input {
    
           file{
    
               path => "/mnt/DShare/hdfsputter_info.log"
    
               type => "error-log"
    
               start_position => "beginning"
    
                   codec => plain {
    
                       charset => "GBK"
    
                  }
    
               }
    
     }
    
     filter {
    
            grok {
    
              patterns_dir => "../logstash-2.4.1/grok-patterns"
    
              match => { "message" => "%{DROP:drop}"}
    
             }
    
             if [drop]{
    
                 drop{}
    
              }
    
    //多行处理插件,根据正则表达式中定义的字符进行匹配,并将符合匹配条件的行向前合并为一行
    
             multiline {
    
                     pattern => "Error\sparsing\sline"
    
                     negate => "true"
    
                     what => "previous"
    
             }
    
             multiline{
    
                     pattern => "HBasePutter.java:314"
    
                     negate => "true"
    
                     what => "previous"
    
             }
    
    //grok插件中通过在grok-patterns中定义的正则表达式匹配并提取日志中对应的字段内容
    
             grok {
    
                     patterns_dir => "../logstash-2.4.1/grok-patterns"
    
                     break_on_match => false
    
                     match => [
    
                                 "message" , ".*%{PASSTIME:passtime}.*",
    
                                 "message" , ".*%{CARPLATE:carplate}.*",
    
                                 "message" , ".*%{PLATECOLOR:platecolor}.*",
    
                                 "message" , ".*%{TGSID:tgsid}.*",
    
                                 "message" , ".*%{LOCATIONID:locationId}.*",
    
                                 "message" , ".*%{DRIVEWAY:driveway}.*",
    
                                 "message" , ".*%{DRIVEDIR:drivedir}.*",
    
                                 "message" , ".*%{CAPTUREDIR:capturedir}.*",
    
                                 "message" , ".*%{CARBRAND:carbrand}.*",
    
                                 "message" , ".*%{CARCOLOR:carcolor}.*"
    
                              ]
    
                     remove_field => "message"
    
             }
    
     #      date {
    
     #              match => ["passtime","yyyy-MM-dd HH:mm:ss"]
    
     #              target => "passtime"
    
     #      }
    
     }
    
    //output插件将提取的日志字段信息输出到指定的ElasticSearch集群进行存储。
    
     output {
    
             stdout{
    
                    codec => rubydebug
    
             }
    
             elasticsearch {
    
                 hosts => ["192.168.174.141"]
    
                 index => "error-log"
    
                 document_type => "error-log"
    
                 manage_template => true
    
                 template => "../logstash-2.4.1/errorlog-template.json"
    
                 template_name => "error-log"
    
                 template_overwrite => true
    
             }
    
     }
    

    3、编辑grok_patterns文件,grok_patterns文件中定义可以匹配所需字段信息的正则表达式,Logstash运行时根据grok_patterns文件中定义的正则表达式去匹配日志文件中的内容,提取所需字段信息。根据需求,定义grok_patterns文件中定义字段匹配内容如下:

     DROP     INFO|WARN|DEBUG
    
      IPADDR     (?<=\bIPADDR=)\w+\b.[0-9]{1,.[0-9]{1,}.[0-9]{1,}
    
       SENDTIME     (?<=\bSENDTIME=)\w+\b-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}\s[a-z0-9A-Z]{0,}
    
       PASSTIME      (?<=\bPASSTIME=)\w+\b-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}
    
       CARPLATE      (?<=\bCARPLATE=).[0-9A-Z]{6}\b
    
       PLATETYPE     [0-9]{2,}
    
       SPEED
    
       PLATECOLOR     (?<=Error\sparsing\sline\s:\s\[PLATECOLOR=)\w+\b
    
       LOCATIONID     (?<=Error\sparsing\sline\s:\s\[LOCATIONID=)\w+\b
    
     DRIVEWAY     (?<=\bDRIVEWAY=)\w+\b
    
     DRIVEDIR     (?<=\bDRIVEDIR=)\w+\b
    
     CAPTUREDIR     (?<=Error\sparsing\sline\s:\s\[CAPTUREDIR=).*?(?=])
    
     CARCOLOR     (?<=Error\sparsing\sline\s:\s\[CARCOLOR=).*?(?=])
    
     CARBRAND     (?<=Error\sparsing\sline\s:\s\[CARBRAND=)\w+\b
    
     TGSID     (?<=\bTGSID=)\w+\b
    
     PLATECOORD     [0-9]{3},[0-9]{3},[0-9]{3},[0-9]{3}
    
     CABCOORD
    
     IMGID1     (?<=\bIMGID1=)\w+\b
    
     IMGID2    (?<=\bIMGID2=)\w+\b
    
     IMGID3     (?<=\bIMGID3=)\w+\b
    

    4、启动Logstash(注意:启动Logstash前需启动ElasticSearch)。执行

    bin/./logstash -f logstash.conf
    

    启动Logstash。日志内容发生变化时,采集日志信息进行处理并输出。

    启动Logstash 采集日志信息进行处理并输出

    5、浏览器打开elasticsearch_head插件进入elasticsearch视图管理界面。

    进入elasticsearch视图管理界面 进入elasticsearch视图管理界面

    三、Java通过Spring Data Elasticsearch API访问ElasticSearch集群,查询获取所需数据

    1) build.gradle文件中添加ElasticSearch、Spring Data Elasticsearch、spring boot starter data elasticsearch依赖:

    
    compile group: 'org.elasticsearch', name: 'elasticsearch', version: '1.7.6'
    
    compile group: 'org.springframework.data', name: 'spring-data-elasticsearch', version: '1.3.6.RELEASE'
    
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-elasticsearch', version: '1.3.6.RELEASE'
    
    build.gradle文件中添加依赖

    2)创建实体类,并使用 @Document(indexName = "索引名", type = "类型名")进行注解。

    import org.springframework.data.annotation.Id;
    
    import org.springframework.data.elasticsearch.annotations.Document;
    
    /**
    
    * Created by Grandland on 2017/12/9.
    
    */
    
    @Document(indexName ="error-log", type ="error-log")
    
    public class ErrorInfo {
    
    }
    

    3)创建Repository接口,查询操作ElasticSearch中的文档信息。

    @Component
    
    @Repository
    
    public interface ErrorInfoRepository extends ElasticsearchRepository {
    
    //定义从ElasticSearch中查询数据接口方法
    
    }
    

    4)controller调用Repository中方法获取ElasticSearch中数据,根据业务需求进行处理封装,返回前台进行解析显示。(代码略)

    相关文章

      网友评论

        本文标题:Logstash采集日志信息输出至ElasticSearch集群

        本文链接:https://www.haomeiwen.com/subject/hudttftx.html