image
解决标签tag问题
场景 将ELK结构切换至filebeat +KAFKA +ELK架构(这里以NGINX做简单实验)
需要解决的问题和遇见的难题
- 不同日志的标签分类实现
解决方案: 在filebeat配置中,增加自定义字段,并且在logstash 配置中进行if判定用于区分
- kafka多broker多host时的书写(filebeat 和 logstash 写法不一样,请参考样例)
- 数据格式的传输,即从filebeat 传入到 logstash 的格式(注意 logstash 中 指明了codec 为json,因为在实验过程中发现 logstash从kafka读取的日志是json格式的。)
filebeat 配置文件
filebeat.inputs:
- type: log
enabled: true
paths:
- /log-dir/access.log
fields:
log_type: access
- type: log
enabled: true
paths:
- /log-dir/error.log
fields:
log_type: error
output.kafka:
enabled: true
hosts: ["10.66.200.220:9092","10.66.201.120:9092","10.66.202.201:9092"]
topic: nginx_logs_app
logstash 配置文件
input {
kafka {
bootstrap_servers => ["10.66.200.220:9092,10.66.201.120:9092"] ## 多kafka broker写法
topics => ["nginx_logs_app"] ## 消费的topic
group_id => "idaas_log" ## 用于kafka区分消费者
codec => "json" ## 重要,定义读取过来的数据为json,则filter直接过滤
consumer_threads => 10 ## 这个数不能大于 kafka 的分区数
decorate_events => true ### 是否将kafka消息的元数据加入ES
}
}
filter {
if [fields][log_type] == "access" { ## 根据filebeat 添加的自定义字段来区分不同的日志
mutate{
split => ["message"," "]
add_field => {
"ip" => "%{[message][0]}"
}
add_field => {
"method" => "%{[message][5]}"
}
add_field => {
"statuscode" => "%{[message][8]}"
}
remove_field => [ "message" ]
}
}
}
output {
if [fields][log_type] == "access" {
elasticsearch {
hosts => "elasticsearchlog-lb.elasticsearch-log"
ssl => false
index => "filebeat_test"
}
} else if [fields][log_type] == "error" {
elasticsearch {
hosts => "elasticsearchlog-lb.elasticsearch-log"
ssl => false
index => "filebeat_error_test"
}
}
}
网友评论