美文网首页
Logstash测试案例

Logstash测试案例

作者: Java及SpringBoot | 来源:发表于2020-03-06 15:41 被阅读0次

    个人专题目录


    1. Logstash学习

    1.1 Logstash基本语法组成

    什么是Logstash

    logstash是一个数据抽取工具,将数据从一个地方转移到另一个地方。如hadoop生态圈的sqoop等。下载地址:https://www.elastic.co/cn/downloads/logstash

    logstash之所以功能强大和流行,还与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。
    Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,filter部分是可选配置,而filter就是过滤器插件,可以在这部分实现各种日志过滤功能。

    配置文件:

    input {
        #输入插件
    }
    filter {
        #过滤匹配插件
    }
    output {
        #输出插件
    }
    

    启动操作:

    logstash.bat -e 'input{stdin{}} output{stdout{}}'
    

    为了好维护,将配置写入文件,启动

    logstash.bat -f ../config/test1.conf
    

    1.2 Logstash输入插件(input)

    https://www.elastic.co/guide/en/logstash/current/input-plugins.html

    1、标准输入(Stdin)

    input{
        stdin{
           
        }
    }
    output {
        stdout{
            codec=>rubydebug    
        }
    }
    

    2、读取文件(File)

    logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个sincedb数据文件的默认路径在 <path.data>/plugins/inputs/file下面,文件名类似于.sincedb_123456,而<path.data>表示logstash插件存储目录,默认是LOGSTASH_HOME/data。

    input {
        file {
            path => ["/var/*/*"]
            start_position => "beginning"
        }
    }
    output {
        stdout{
            codec=>rubydebug    
        }
    }
    

    默认情况下,logstash会从文件的结束位置开始读取数据,也就是说logstash进程会以类似tail -f命令的形式逐行获取数据。

    3、读取TCP网络数据

    input {
      tcp {
        port => "1234"
      }
    }
    
    filter {
      grok {
        match => { "message" => "%{SYSLOGLINE}" }
      }
    }
    
    output {
        stdout{
            codec=>rubydebug
        }
    }
    

    1.3 Logstash过滤器插件(Filter)

    https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

    Grok 正则捕获

    grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。

    Grok 的语法规则是:

    %{语法: 语义}
    

    例如输入的内容为:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
    

    %{IP:clientip}匹配模式将获得的结果为:clientip: 172.16.213.132
    %{HTTPDATE:timestamp}匹配模式将获得的结果为:timestamp: 07/Feb/2018:16:24:19 +0800
    而%{QS:referrer}匹配模式将获得的结果为:referrer: "GET / HTTP/1.1"

    下面是一个组合匹配模式,它可以获取上面输入的所有内容:

    %{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
    

    通过上面这个组合匹配模式,我们将输入的内容分成了五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用grok的目的。

    例子:

    input{
        stdin{}
    }
    filter{
        grok{
            match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
        }
    }
    output{
        stdout{
            codec => "rubydebug"
        }
    }
    

    输入内容:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
    

    时间处理(Date)

    date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里,这在之前已经做过简单的介绍。
    下面是date插件的一个配置示例(这里仅仅列出filter部分):

    filter {
        grok {
            match => ["message", "%{HTTPDATE:timestamp}"]
        }
        date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
    }
    

    数据修改(Mutate)

    (1)正则表达式替换匹配字段

    gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于mutate插件中gsub的示例(仅列出filter部分):

    filter {
        mutate {
            gsub => ["filed_name_1", "/" , "_"]
        }
    }
    

    这个示例表示将filed_name_1字段中所有"/"字符替换为"_"。

    (2)分隔符分割字符串为数组

    split可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于mutate插件中split的示例(仅列出filter部分):

    filter {
        mutate {
            split => ["filed_name_2", "|"]
        }
    }
    

    这个示例表示将filed_name_2字段以"|"为区间分隔为数组。

    (3)重命名字段

    rename可以实现重命名某个字段的功能,下面是一个关于mutate插件中rename的示例(仅列出filter部分):

    filter {
        mutate {
            rename => { "old_field" => "new_field" }
        }
    }
    

    这个示例表示将字段old_field重命名为new_field。

    (4)删除字段

    remove_field可以实现删除某个字段的功能,下面是一个关于mutate插件中remove_field的示例(仅列出filter部分):

    filter {
        mutate {
            remove_field  =>  ["timestamp"]
        }
    }
    

    这个示例表示将字段timestamp删除。

    (5)GeoIP 地址查询归类

    filter {
        geoip {
            source => "ip_field"
        }
    }
    

    综合例子:

    input {
        stdin {}
    }
    filter {
        grok {
            match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
            remove_field => [ "message" ]
       }
    date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
    mutate {
              convert => [ "response","float" ]
               rename => { "response" => "response_new" }   
               gsub => ["referrer","\"",""]          
               split => ["clientip", "."]
            }
    }
    output {
        stdout {
            codec => "rubydebug"
        }
    

    1.4 Logstash输出插件(output)

    https://www.elastic.co/guide/en/logstash/current/output-plugins.html

    output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:

    • file: 表示将日志数据写入磁盘上的文件。
    • elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。

    1、输出到标准输出(stdout)

    output {
        stdout {
            codec => rubydebug
        }
    }
    

    2、保存为文件(file)

    output {
        file {
            path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
        }
    }
    

    3、输出到elasticsearch

    output {
        elasticsearch {
            host => ["192.168.1.1:9200","172.16.213.77:9200"]
            index => "logstash-%{+YYYY.MM.dd}"       
        }
    }
    
    • host:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
    • index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。
    • manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。
    • template_name:这个配置项用来设置在Elasticsearch中模板的名称。

    1.5 综合案例

    input {
        file {
            path => ["D:/ES/logstash-7.3.0/nginx.log"]        
            start_position => "beginning"
        }
    }
    
    filter {
        grok {
            match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
            remove_field => [ "message" ]
       }
        date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
        mutate {
               rename => { "response" => "response_new" }
               convert => [ "response","float" ]
               gsub => ["referrer","\"",""]
               remove_field => ["timestamp"]
               split => ["clientip", "."]
            }
    }
    
    output {
        stdout {
            codec => "rubydebug"
        }
    
    elasticsearch {
        host => ["localhost:9200"]
        index => "logstash-%{+YYYY.MM.dd}"       
    }
    
    }
    

    相关文章

      网友评论

          本文标题:Logstash测试案例

          本文链接:https://www.haomeiwen.com/subject/yvgirhtx.html