- 简介
在分布式系统中,为了保证每个服务的可用性,通常会将每个服务以集群的方式部署,为了集中管理分布式系统下的日志,便于对日志的分析,引入ELK做日志采集分析,通常的做法是通过logstash监听日志文件,但此方式很难对每条日志具体的切割,针对不同的业务场景,我们需要对日志以不同的维度拆分开,所以引入logstash-logback-encoder并通过tcp的方式将日志发送到logstash。
log stash-logback-encoder项目地址: GITHUB
- 引入logstash-logback-encoder依赖
<!-- logstash-logback-encoder -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
- 在logback配置文件中配置logstash对应的appender
<!-- 为logstash输出的Appender -->
<appender name="LOGSTASHEVENT" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!-- 填入logstash监听地址 -->
<destination>${logback.address}</destination>
<destination>${logback.address}</destination>
<!-- 负载均衡策略 轮询 -->
<connectionStrategy>
<roundRobin>
<connectionTTL>5 minutes</connectionTTL>
</roundRobin>
</connectionStrategy>
<!-- keep alive -->
<keepAliveDuration>5 minutes</keepAliveDuration>
<!-- 连接失败重试 -->
<reconnectionDelay>1 second</reconnectionDelay>
<!-- write buffer -->
<writeBufferSize>16384</writeBufferSize>
<!-- 日志监听器 -->
<listener class="connect.service.config.LogBackEventListener"/>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder">
<!--<timeZone>UTC</timeZone>-->
<!-- 时间戳 -->
<timestampPattern>yyyy-MM-dd'T'HH:mm:ss.SSS</timestampPattern>
<!-- 添加额外字段 -->
<customFields>{"extraFiled":"value"}</customFields>
<!-- 不包含connect字段 -->
<!--<includeContext>false</includeContext>-->
</encoder>
</appender>
- logstash配置
input {
tcp {
port => 4560
codec => json_lines
}
}
filter {
mutate {
remove_field => ["level_value", "index_name", "port"]
}
}
output {
elasticsearch {
hosts => ["http://120.26.233.25:9200"]
index => "%{[app_name]}"
# 模版配置文件地址
template => "/etc/logstash/app.json"
# 模版配置索引名称
template_name => "templateName*"
template_overwrite => true
}
stdout {codec => rubydebug}
}input {
tcp {
port => 4560
codec => json_lines
}
}
filter {
mutate {
remove_field => ["level_value", "index_name", "port"]
}
}
output {
elasticsearch {
hosts => ["http://120.26.233.25:9200"]
index => "%{[app_name]}"
# 模版配置文件地址
template => "/etc/logstash/app.json"
# 模版配置索引名称
template_name => "templateName*"
template_overwrite => true
}
stdout {codec => rubydebug}
}input {
tcp {
port => 4560
codec => json_lines
}
}
filter {
mutate {
remove_field => ["level_value", "index_name", "port"]
}
}
output {
elasticsearch {
hosts => ["http://120.26.233.25:9200"]
index => "%{[app_name]}"
# 模版配置文件地址
template => "/etc/logstash/app.json"
# 模版配置索引名称
template_name => "templateName*"
template_overwrite => true
}
stdout {codec => rubydebug}
}
模版配置文件
{
"index_patterns": "templateName*",
"order" : 0,
"settings": {
"index.number_of_shards": 5,
"number_of_replicas": 0,
"index" : {
"refresh_interval" : "5s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [{
"message_field": {
"match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "text"
}
}
}, {
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}],
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "long"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"location": {
"type": "geo_point"
},
"latitude": {
"type": "float"
},
"longitude": {
"type": "float"
}
}
}
}
}
}
}
- 代码中使用方式
import static net.logstash.logback.argument.StructuredArguments.*
// 日志信息为: log message value,json列为: "name":"value
logger.info("log message {}", value("name", "value"));
// 日志信息为: log message name=value,json列为: "name":"value
logger.info("log message {}", keyValue("name", "value"));
// 更多使用方式可以参考github项目文档
网友评论