美文网首页
logstash神器之grok

logstash神器之grok

作者: 康小为6840 | 来源:发表于2019-09-18 18:33 被阅读0次

    1.grok表达式

    Grok 是 Logstash 最重要的插件。你可以在 grok 里直接使用或应用预定义的表达式名称,grok 支持把预定义的 grok 表达式 写入到文件中,官方提供的预定义 grok 表达式见:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patternss

    最直观的grok语法使用参见:ELK应用之Logstash
    https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html,这里不做赘述。

    下面是从官方文件中摘抄的最简单但是足够说明用法的示例:

    USERNAME [a-zA-Z0-9._-]+
    USER %{USERNAME}
    

    第一列是正则grok表达式的名称,可直接使用;第二列是普通的正则表达式
    第一行,用普通的正则表达式来定义一个 grok 表达式;第二行,通过打印赋值格式,用前面定义好的 grok 表达式来定义另一个 grok 表达式。(简单的说就是,名字和表达式,而且可嵌套使用)

    grok 表达式使用的基本语法是下面这样的:

    %{SYNTAX:SEMANTIC}
    

    小贴士:SYNTAX是指预定义好的表达式的名字,SEMANTIC是指匹配之后要放的字段名字(自定义或随心所欲,只要自己能认识区分的)。

    附录:

    USERNAME [a-zA-Z0-9._-]+
    USER %{USERNAME}
    EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
    EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
    INT (?:[+-]?(?:[0-9]+))
    BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
    NUMBER (?:%{BASE10NUM})
    BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
    BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
    
    POSINT \b(?:[1-9][0-9]*)\b
    NONNEGINT \b(?:[0-9]+)\b
    WORD \b\w+\b
    NOTSPACE \S+
    SPACE \s*
    DATA .*?
    GREEDYDATA .*
    QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
    UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
    # URN, allowing use of RFC 2141 section 2.3 reserved characters
    URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+
    
    # Networking
    MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
    CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
    WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
    COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
    IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
    IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
    IP (?:%{IPV6}|%{IPV4})
    HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
    IPORHOST (?:%{IP}|%{HOSTNAME})
    HOSTPORT %{IPORHOST}:%{POSINT}
    
    # paths
    PATH (?:%{UNIXPATH}|%{WINPATH})
    UNIXPATH (/([\w_%!$@:.,+~-]+|\\.)*)+
    TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
    WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
    URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+
    URIHOST %{IPORHOST}(?::%{POSINT:port})?
    # uripath comes loosely from RFC1738, but mostly from what Firefox
    # doesn't turn into %XX
    URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+
    #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
    URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
    URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
    URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
    
    # Months: January, Feb, 3, 03, 12, December
    MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b
    MONTHNUM (?:0?[1-9]|1[0-2])
    MONTHNUM2 (?:0[1-9]|1[0-2])
    MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
    
    # Days: Monday, Tue, Thu, etc...
    DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
    
    # Years?
    YEAR (?>\d\d){1,2}
    HOUR (?:2[0123]|[01]?[0-9])
    MINUTE (?:[0-5][0-9])
    # '60' is a leap second in most time standards and thus is valid.
    SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
    TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
    # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
    DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
    DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
    ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
    ISO8601_SECOND (?:%{SECOND}|60)
    TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
    DATE %{DATE_US}|%{DATE_EU}
    DATESTAMP %{DATE}[- ]%{TIME}
    TZ (?:[APMCE][SD]T|UTC)
    DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
    DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
    DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
    DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
    
    # Syslog Dates: Month Day HH:MM:SS
    SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
    PROG [\x21-\x5a\x5c\x5e-\x7e]+
    SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
    SYSLOGHOST %{IPORHOST}
    SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
    HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
    
    # Shortcuts
    QS %{QUOTEDSTRING}
    
    # Log formats
    SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
    
    # Log Levels
    LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
    

    2.截取指定字符

    Item Comment
    %{USER:user} 以 USER 模式进行正则匹配,结果放在user中
    [[^]]+] 以 [ 开头、以]结尾,内容是由一个或多个不是 ] 的字符填充而成
    %{NUMBER: id:int} 以 NUMBER 模式进行正则匹配,为整数型,结果放在id中
    \n 匹配换行符
    %{NUMBER:query_time:float} 以 NUMBER 模式进行正则匹配,为浮点型,结果放在query_time中
    (?:use\s+%{USER:usedatabase};\s*\n)? 这个匹配可能有,也可能无,如果有,就是以use开头,若干空字符,以 USER 模式进行正则匹配,结果放在usedatabase中,然后紧接着 ; ,后面是0个或多个空字符,然后是换行,注意:如果有是整体有,如果无,是整体无
    \b 代表字单词边界不占位置,只用来指示位置
    .* 尽可能多的任意匹配
    (?< query>(?< action>\w+)\b.*) 整体匹配,存到query中,以一个或多个字符开头组成的单词,结果存到action中
    (?:\n#\s+Time)? 内容可能有,也可能无,如果有,是接在一个换行之后,以 # 开头,隔着一个或多个空字符,然后是Time
    .*$ 任意匹配直到结尾
    grok正则表达式:(?<temMsg>(.*)(?=Report)/?) 获取Report之前的字符作为temMsg字段的值
    grok正则表达式:(?<temMsg>(?=Report)(.*)/?) 获取Report之后的字符作为temMsg字段的值
    grok{
            match => { 
                     #截取<Report>之前的字符作为temMsg字段的值
                    "message" => "(?<temMsg>(.*)(?=Report)/?)" 
                }
        }
    这个是截取特定的字符集日志,要日志中包含了"Report"关键字,关键字根据实际替换即可
    (注:表达式中(?=Report)中的等于"="符号如果换成"<="这表示就不包含本身了,例如(?<temMsg>(.*)(?=Report)/?)可以写成(?<temMsg>(.*)(?<=Report)/?)这样输出的结果就不包含Report了,同理下面的一样)
    
    grok正则表达式:(?<temMsg>(?<=report).*?(?=msg)) 截取report和msg之间的值,不包含report和msg本身
    grok正则表达式:(?<temMsg>(report).*?(?=msg)) 截取 包含report但不包含msg
    grok正则表达式:(?<temMsg>(?<=report).*?(msg))截取  不包含report但包含msg
    grok正则表达式:(?<temMsg>(report).*?(msg|request))输出以report开头,以msg或者以request结尾的所有包含头尾信息
    grok正则表达式:(?<temMsg>(report).*?(?=(msg|request)))输出以report开头,以msg或者以request结尾的不包含头尾信息
    grok{
            match => { 
                     #截取<Report>之后的和<msg>之前的值作为temMsg字段的值
                    "message" => "(?<temMsg>(?<=report).*?(?=msg))" 
                }
        }
    这个是截取特定的字符集日志,要日志中包含了【report和msg和request】关键字
    之间的表达式只要替换一下就可以使用了
    (注过个表达式中出现异常,在单个的字符串中可以将小括号【()】去掉,例如:(report).*?(?=msg) 可以写成report.*?(?=msg))
    
    grok正则表达式:(?<MYELF>([\s\S]{500}))
     grok{
           match => {
                  #截取日志500个字符 作为MYELF的值
                  "message" => "(?<MYELF>([\s\S]{500}))"  
                 }
         }
        对有所日志截取500个字符,可以加入if()做为判断条件,根据自身项目来
    
    grok正则表达式:%{LOGLEVEL:level}
    grok {
            #这个patterns_dir大家都应该正对 单独写表达式的地方
            #patterns_dir => "/usr/local/nlp/logstash-6.0.1/config/patterns"
                    match => [
                            "message","%{LOGLEVEL:level}"         
                    ]
            }
      这个比较简单 就不多说了
    
    结合上面的 这个是对level级别的日志做判断 如果日志中含有DEBUG的,就drop掉
    if [level] == "DEBUG" {
           drop { }
    }
    这个其实和上面差不多,加了一个【~】表示对单条的前后日志做匹配
    if[message]=~"ASPECT"{
           drop { }
    }
    
    这个是说对temMsg赋值的所有的日志从新命名打印message
    mutate {
            #重命名字段temMsg为message
            rename => {"temMsg" => "message"} 
    }
    

    3.过滤器切割

    filter {                                      
        if [type] == "simple" {
            mutate{
                     split => ["message","|"]     #按 | 进行split切割message
                            add_field =>   {
                                    "requestId" => "%{[message][0]}"
                            }
                            add_field =>   {
                                    "timeCost" => "%{[message][1]}"
                            }
                            add_field =>   {
                                    "responseStatus" => "%{[message][2]}"
                            }
                            add_field =>   {
                                    "channelCode" => "%{[message][3]}"
                            }
                            add_field =>   {
                                    "transCode" => "%{[message][4]}"
                            }
            }
            mutate {
                convert => ["timeCost", "integer"]  #修改timeCost字段类型为整型
            }
        } else if [type] == "detail" {
            grok{
                match => {             
                    #将message里面 TJParam后面的内容,分隔并新增为ES字段和值
                    "message" => ".*TJParam %{PROG:requestId} %{PROG:channelCode} %{PROG:transCode}"
                }
            }
            grok{
                match => { 
                    #截取TJParam之前的字符作为temMsg字段的值
                    "message" => "(?<temMsg>(.*)(?=TJParam)/?)" 
                    #删除字段message
                    remove_field => ["message"]          
                }
            }
            mutate {
                 #重命名字段temMsg为message
                rename => {"temMsg" => "message"}           
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:logstash神器之grok

          本文链接:https://www.haomeiwen.com/subject/bcrrsctx.html