ELK +kafka作为日志和简单数据处理
1.0 使用背景介绍
使用背景:让服务每天产生大量的日志文件数据对公司业务做一部分贡献,基于日志数据做简单的数据加工用于替 代现在通过SQL视图方式的统计,基于错误日志的简单报警,和接口性能统计等
Kafka Stream 做简单的流式处理尝试,ELK的版本6.6.6, kafka 版本 2.1.0
使用kafka可以灵活扩展结束其他数据处理框架
2.0 流程示例图

3.0 Filebeat
用于收集各个服务器上的日志,filebeat是Beat家族的一部分,用户读取日志文件,可以直接发送到Elasticsearch,Logstash或者其他消息队列,filebeat是一个轻量级的日志代理,占用服务资源很少,新版本有"压敏传感器"可以根据流量自动调节传送大小,详见说明参考官方文档
3.1 下载安装
地址 :[https://www.elastic.co/downloads/beats/filebeat]选择版本和安装包的方式,个人选择使用压缩包的方式,下载完后解压可以找到名为filebeat.yml的配置文件
filebeat.prospectors:
- type: log
paths:
- /data/wwwlogs/*.log
tail_files: true
ignore_older: 2h
fields:
logSource: nginx-test
fields_under_root: true
processors:
- drop_fields:
fields: ["host","beat","prospector","log","input","offset"]
output.file:
enabled: false
path: "/tmp/filebeat"
filename: filebeat
output.kafka:
enabled: true
hosts: ["host:port"]
version: 2.0.0
topic: 'filebeat_nginx_test'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
#------log--------------------------
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
path: /home/filebeat/logfilebeat/
keepfiles: 4
name: mybeat.log
4.0 Logstash 配置文件,具体配置参考 https://www.elastic.co/guide/index.html 中 input-kafka介绍
input{
kafka{
bootstrap_servers => ["10.0.20.116:9092"]
client_id => "kafaka-test"
group_id => "kafaka-test"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => ["filebeat_nginx_test_01"]
type => "nginx"
codec => "json"
}
}
filter {
mutate {
add_field => {
"kafka" => "%{[@metadata][kafka]}"
}
}
}
output {
elasticsearch {
hosts => "host:port"
index => "yourindex-%{+YYYY.MM}"
}
kafka {
bootstrap_servers => "host:port"
topic_id => "nginx-access-log"
compression_type => "snappy"
}
}
网友评论