背景
日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因。经常分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误。
通常,日志被分散的储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。当务之急我们使用集中化的日志管理,例如:开源的syslog,将所有服务器上的日志收集汇总。
集中化管理日志后,日志的统计和检索又成为一件比较麻烦的事情,一般我们使用grep、awk和wc等Linux命令能实现检索和统计,但是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法难免有点力不从心。
开源实时日志分析ELK平台能够完美的解决我们上述的问题,ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成。官方网站:https://www.elastic.co/products
ELK Stack 是软件集合 Elasticsearch、Logstash、Kibana 的简称,由这三个软件及其相关的组件可以打造大规模日志实时处理系统。
Elasticsearch 是一个基于 Lucene 的、支持全文索引的分布式存储和索引引擎,主要负责将日志索引并存储起来,方便业务方检索查询。
Logstash 是一个日志收集、过滤、转发的中间件,主要负责将各条业务线的各类日志统一收集、过滤后,转发给 Elasticsearch 进行下一步处理。
Kibana 是一个可视化工具,主要负责查询 Elasticsearch 的数据并以可视化的方式展现给业务方,比如各类饼图、直方图、区域图等。
所谓“大规模”,指的是 ELK Stack 组成的系统以一种水平扩展的方式支持每天收集、过滤、索引和存储 TB 规模以上的各类日志。
架构

上图是 ELK Stack 实际应用中典型的一种架构,其中 Log4j 文件在具体的业务机器上,通过实时的方式获取增量的日志,并转发到 Kafka 消息系统暂存。
Kafka 以高吞吐量的特征,作为一个消息系统的角色,接收从 filebeat 收集转发过来的日志,通常以集群的形式提供服务。
然后,Logstash 从 Kafka 中获取日志,并通过 Input-Filter-Output 三个阶段的处理,更改或过滤日志,最终输出我们感兴趣的数据。通常,根据 Kafka 集群上分区(Partition)的数量,1:1 确定 Logstash 实例的数量,组成 Consumer Group 进行日志消费。
最后,Elasticsearch 存储并索引 Logstash 转发过来的数据,并通过 Kibana 查询和可视化展示,达到实时分析日志的目的。
Elasticsearch/Kibana 还可以通过安装 x-pack 插件实现扩展功能,比如监控 Elasticsearch 集群状态、数据访问授权等。
实现
1. log4j.properties文件配置
log4j.rootLogger=INFO,console,kafka
#log4j.logger.com.demo.kafka=DEBUG,kafka
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=elk_log_topic
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=10.108.4.203:9092,10.108.4.204:9092,10.108.4.205:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
#log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n
#log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
2. Kafka
创建Topic:
[root@hadoop1 fc]# kafka-topics --create --zookeeper 10.108.4.203:2181,10.108.4.204:2181,10.108.4.205:2181 --partitions 3 --replication-factor 2 --topic elk_log_topic
3. Logstash部署
添加配置文件logstash.conf:
input {
kafka {
codec => "line"
topics_pattern => "elk_.*"
bootstrap_servers => "10.108.4.203:9092,10.108.4.204:9092,10.108.4.205:9092"
auto_offset_reset => "latest"
group_id => "logstash_g1"
}
}
filter {
}
output {
elasticsearch {
hosts => ["10.108.4.203:9200","10.108.4.204:9200","10.108.4.205:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}
配置文件主要分为三大部分: Input / Filter / Output,对应收集、过滤、转发三个阶段。显然,Input 阶段只需要指定 Kafka 集群相关信息即可,Output 阶段只需要指定 Elasticsearch 服务器相关的信息即可,比较复杂的是 Filter 过滤阶段。
启动:
[root@hadoop3 logstash-6.2.4]# bin/logstash -f config/logstash.conf
4. ES集群部署
Elasticsearch6.x集群部署
ES集群web可视化集群管理Cerebro
5. 配置并启动 Kibana 服务
server.host: "10.108.4.203"
server.name: "bi-Kibana"
elasticsearch.url: "http://hadoop3:9200/"
kibana.index: ".kibana"
启动 Kibana 进程:
nohup ./bin/kibana &
然后,就可以在浏览器访问 Kibana 了:

网友评论