因为是在开发测试环境搭建,为了调试的方便,使用的是filebeat->logstash->es的形式,没有使用kafka。
首先为保证收集到的日志可方便进行格式化,在springboot工程中配置日志的pattern(使用logback),我设置的为:[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] [%-5level] [%logger{50}] - [%msg] - # '%ex'%n 。 然后针对输出的全量日志编写filebeat的配置文件,如下:
filebeat.inputs:
- type: log
enabled: true
paths:
- /home/workspace/logs/luke-order/luke-order-info.log
multiline:
pattern: '^\['
negate: true
match: after
max_lines: 500
timeout: 5s
output:
logstash:
# enabled: true
hosts: ["127.0.0.1:5044"]
multiline作用是针对异常堆栈信息,将多行匹配为一个message,我的日志都是以“[”打头。输出到5044是logstash的接受端口。然后是logstash的配置,如下:
input {
beats {
port => "5044"
}
}
#filter {
# dissect {
# mapping => {
# "message" => "[%{date}] [%{thread}] [%{level}] [%{class}] - [%{msg}] - # (\'\'|%{exception})"
# }
# }
#}
filter {
grok {
match => [
"message", "\[%{DATA:eventDate}\] \[%{NOTSPACE:thread}\] \[%{DATA:level}\] \[%{NOTSPACE:class}\] - \[%{DATA:msg}\] - # (\'\'|%{QUOTEDSTRING:exception})"
]
}
date {
match => ["eventDate","yyyy-MM-dd HH:mm:ss.SSS"]
timezone => "Asia/Shanghai"
target => "eventDate"
}
mutate{
remove_field => ["host","log","@version","ecs","agent","input","flags"]
gsub =>["level"," ",""]
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "luke-order-log-%{+YYYY-MM-dd}"
manage_template => false
template_name => "luke-order-log-template"
}
stdout { codec => rubydebug }
}
关于日志的格式化,本来想使用dissect ,因为其是使用分割的方式来获取各字段,性能优于grok,但是发现针对异常堆栈,它无法做到完美的匹配,使用好多方式都不行,最终放弃还是使用grok。格式化完成后,针对我日志中获取的日期字段eventDate进行处理,使用logstash的date插件,这一步必须要做,因为最终数据展示在kibana时,会为作为x坐标的日期字段加上8小时(时区的问题),所以我只能在filter内处理是对我要使用的日期字段进行时区的处理。之所以没使用logstash自己的@timestamp字段,是因为它是processTime,而我要使用的是eventTime。然后输出到es,在此之前,在es中要建立索引模板,不然我自定义的日期字段什么的最终都是text类型,就没法作为kinana的discover的时间线坐标字段了。然后就是output中要禁用掉logstash的默认索引模板,启用自定义的即可(注意:es的索引名我是按天创建,即在索引名前缀后加上%{+YYYY-MM-dd},而这个依赖于@timestamp字段,之前我将@timestamp字段删了,索引名就没有后面的日期了),最后启动filebeat(./filebeat -e -c log-filebeat-logstash.yml,启动logstash(./bin/logstash -f script/filebeat-logstash-es.conf --config.reload.automatic)。另外es的索引模板如下:
PUT /_template/luke-order-log-template
{
"index_patterns": "luke-order-log-*",
"order": 1,
"settings": {
"number_of_shards": 2
},
"mappings": {
"properties": {
"class": {
"type": "text"
},
"@timestamp":{
"type": "date"
},
"eventDate": {
"type": "date"
},
"exception": {
"type": "text"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text"
},
"msg": {
"type": "text"
},
"thread": {
"type": "text"
}
}
}
}
最终es中存入数据,在kibana创建Index patterns,以我自定义的eventDate字段为维度,然后在discover模块中进行查看,使用kql之类的进行查询。
网友评论