Logstash
1. grok
(1) 语法
这东西就是正则表达式匹配,不过由于日志太大了,如果你这个正则从头写到尾估计写一半人就没了,所以人家预定义好了许多的正则表达式,那些%xxx本质上就是那些正则,一样的替代一下。
内置的那些预定义好的正则表达式,在github上的地址:
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
这里直接粘过来省的去找
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# URN, allowing use of RFC 2141 section 2.3 reserved characters
URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+
# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
IPORHOST (?:%{IP}|%{HOSTNAME})
HOSTPORT %{IPORHOST}:%{POSINT}
# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (/([\w_%!$@:.,+~-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y|i)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[APMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG [\x21-\x5a\x5c\x5e-\x7e]+
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
# Shortcuts
QS %{QUOTEDSTRING}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
(2) gork提取信息
grok提取字段
不过grok这东西神烦,不能自定义提取,放两个案例。
成功案例:
\[%{TIMESTAMP_ISO8601:timestamp}\]\s+(GatewayMessage:\s+)(\|)%{DATA:url}(\|)%{DATA:methodvalue}(\|)%{DATA:queryParams}(\|)%{DATA:remote}(\|)%{DATA:statusCode}(\|)%{DATA:time}(\|)%{DATA:response}
魔改案例:
\[%{TIMESTAMP_ISO8601:timestamp}\]\s+(GatewayMessage:\s+)(\|)(.*?)(\|)%{DATA:methodvalue}(\|)%{DATA:queryParams}(\|)%{DATA:remote}(\|)%{DATA:statusCode}(\|)%{DATA:time}(\|)%{DATA:response}
魔改案例里面,把第一个DATA改成了.*?
,这样的确可以起到占位符的作用,直接把这一个跳过去,不过如果想把自定义的这些正则表达式匹配的值,输入到一个键值对里输出出来,那是不成的。
如果要输出出来,只能把自己定义的正则表达式命好名字,然后输入到一个文件里面,然后把这个文件告诉给Logstash,这样才能用自定义输出json键值对。
grok和json报错
最开始的写法是%{DATA:response},用一个DATA指代后面GatewayMessage那一部分,这个在Kibana调试的时候没任何问题,不过在docker里面一跑就是json解析出错。
改成%{GREEDYDATA:response}之后就没问题了。
DATA对应的是.*?
GREEDYDATA对应.*
总结就是:DATA出错的地方改成GREEDYDATA。
解析出来的数据:
这也就是filebeat的日志结构
{
#message是gateway输出的信息
"message" => "[2020-02-21 13:40:50] GatewayMessage: |http://39.100.144.125:7052/auth/test|GET|{}|/119.190.195.231:5388|200 OK|4ms|{\"code\":200,\"msg\":\"success\",\"result\":\"1\"} ",
#下面这一排是logstash解析出来的
"timestamp" => "2020-02-21 13:40:50",
"time" => "4ms",
"methodvalue" => "GET",
"queryParams" => "{}",
"url" => "http://39.100.144.125:7052/auth/test",
"remote" => "/119.190.195.231:5388",
"statusCode" => "200 OK",
"response" => "{\"code\":200,\"msg\":\"success\",\"result\":\"1\"} ",
#这下面是filebeat自带的
#这条日志的tag属性
"tags" => [
[0] "gateway",
[1] "beats_input_codec_json_applied"
],
#生产这条日志的filebeat信息
"agent" => {
"hostname" => "a4f546cbb1c0",
"version" => "7.6.0",
"type" => "filebeat",
"id" => "378e62e8-0934-4879-9173-b879b8870811",
"ephemeral_id" => "374a14bb-8a4f-4115-adf9-2d9eee6d3204"
},
#这条日志的文件来源
"log" => {
"offset" => 7449808,
"file" => {
"path" => "/var/log/gateway.log"
}
},
"input" => {
"type" => "log"
},
"@version" => "1",
"host" => {
"name" => "a4f546cbb1c0"
},
"ecs" => {
"version" => "1.4.0"
},
#时间戳
"@timestamp" => 2020-02-21T05:40:52.728Z
}
(3) grok正则表达式调试
在没写出gork正则的前提下,先别到docker里面去改配置文件调试,那样有够慢的,先在kibana下面调试一下,这个比较方便。
(4) 看docker日志
大的正则表达式已经写完了,剩下一点小语法修修改改的,就可以去docker里面改配置文件去调试了。
docker logs --tail="100" logstash
顺便记另一个,tail看文件
# 看末尾五行
tail -n 5 logstash.log
# -f实时刷新,F5
tail -f logstash.log
(5) logstash具体的filter
首先split可以保证这个不出错,因为有几个|这个都可以商量。
不过上场就用grok的话,如果|对不上,就解析错误了。
幸运的是logstash解析错误不报bug,只是那条日志的输出会带上logstash错误这样的标签,所以尽情grok吧。
2. 其他filter
最终的logstash配置
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\s+(GatewayMessage:\s+)(\|)%{DATA:url}(\|)%{DATA:methodvalue}(\|)%{DATA:queryParams}(\|)%{DATA:remote}(\|)%{DATA:statusCode}(\|)%{DATA:time}(\|)%{GREEDYDATA:response}" }
remove_field => ["message"]
}
#把java那段输出改成json
json {
source => "response"
target => "jsonresponse"
}
mutate {
remove_field =>["response"]
}
#地理信息顺便山一大堆
geoip {
source => "remote"
remove_field => ["[geoip][latitude]", "[geoip][longitude]",
"[geoip][region_code]", "[geoip][country_name]", "[geoip][continent_code]",
"[geoip][location]", "[geoip][country_code3]", "[geoip][ip]", "[geoip][country_code2]",
"[geoip][timezone]"]
}
}
output{
elasticsearch {
hosts=>["39.100.144.125:9200"]
index => "logstash-%{+YYYY.MM.dd}" #对日志进行索引归档
}
#stdout{codec => rubydebug}
}
输出到es的索引,是按照天来更改的,所以在es里面就每天建立一个索引,方便删除往期的数据,以及这样可以保证一个索引不会太大,能提高一点搜索效率。
3. 几个有用的博客
https://www.cnblogs.com/FengGeBlog/p/10305318.html
网友评论