前言
上一篇说了,如何在一台机器上搭建整个环境,接下来总结一下,自己开发的应用层面以及一些具体的配置改动
业务日志
业务日志内容分为两个部分
- 业务方自己输出的业务日志,由于业务日志内容要统一规则比较难,只能统一日志级别的内容,比如日志级别,日志类等,
- 统一基础框架输出的框架日志(accesslog),这个比较好统一规则,由框架来进行统一控制,并且内容比较重要,包括请求的url,参数,方法,耗时,返回码,http头等等,所以这一类日志用json格式进行打印
日志输出目录
日志输出目录可以通过框架进行控制,根据业务名到不同的目录和日志文件,然后通过filebeat来进行收集。filebeat收集之后,filebeat会添加一部分filebeat的数据,然后把我们业务的日志作为message字段,以json的格式输出到kafka同一个topic中,这一部分配置在上面的文章已经提到。
logstash配置
logstash从kafka中收到了filebeat传输的日志,输入和输出基本固定,主要就是配置filter层的内容。由于filebeat开始传输的就是json格式,所以logstash要解析一次json
json {
source => "message"
}
然后message就是业务真正输出的内容,这个时候日志的格式是由logback来进行确定的,所以可以通过grok来进行匹配
logback.xml格式内容
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%t] [%X{X-B3-TraceId}] %-5level %logger{50} - %msg%n</pattern>
所以对应的grok可以写成
grok {
match => [
"message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{DATA:threadName}\] \[%{WORD:traceId}\] %{LOGLEVEL:logLevel}\s*%{DATA:className} \- %{GREEDYDATA:message}"
]
overwrite => ["message"]
}
由于traceid不一定会存在,因为当业务没有开启sleuth的时候,是没有traceid的,上面的解析会失败,所以加了一个补充配置
#没有traceId的情况
if "_grokparsefailure" in [tags] {
grok {
match => [
"message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{DATA:threadName}\] \[\] %{LOGLEVEL:logLevel}\s*%{DATA:className} \- %{GREEDYDATA:message}"
]
remove_tag => ["_grokparsefailure"]
overwrite => ["message"]
}
}
然后,上面说了, 业务分为两个部分,具体的业务输出的不能确定内容,但是框架输出的是json格式的数据,所以可以根据className
进行判断来进行解析,如果是框架日志,则加入access的前缀。
#如果是框架的log日志
if "JsonLogBuilder" in [className] {
json {
source => "message"
target => "access"
}
}
最后再给日期进行赋值
#日期赋值
date {
match => [ "timestamp" , "YYYY-MM-dd HH:mm:ss.SSS" ]
target => "@timestamp"
}
这样就把原始的日志进行了转换,存储到了es里面。
附上具体的grok的pattern的规则地址:
https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
kibana配置和使用
然后打开kibana,通过指定index,就可以看到具体的日志内容了,然后具体的内容根据指定的格式被做成了es的field,这样就可以根据具体的field进行搜索、聚合、统计了。或者根据这些字段自己制作图形,最后展示在dashboard上。
另外如果在kibana上进行搜索,可以使用lucence的搜索语法
常用的有
search message
field:(field1 and field2)
+ - = && || > < ! ( ) { } [ ] ^ " ~ * ? : \ / 需要用“\”进行转译
网友评论