最近在学习java相关的知识,简单记录一下构建ELK日志分析系统的过程。
环境介绍
- Ubuntu 20.04
- Kafka 2.8.0
- FileBeat 7.12.1
- Logstash 7.12.1
- ElasticSearch 7.12.1
- Kibana 7.12.1
- web.jar 用于产生所要记录的日志,基于SpringBoot构建
其中:Kafka ,FileBeat ,Logstash 用于实现日志数据的抓取,过滤,ElasticSearch和Kibana 用于实现数据的展示。每个组件的此处不详细展开,主要我也不咋明白来着~~
整体架构图(图片来源于慕课网)
- image.png
- image.png
组件安装简介
Kafka安装与配置
1.官网下载想要的版本,此处使用的是Linux,所以下载Linux版本
# 解压缩
tar -zxvf kafka_2.13-2.8.0.tgz
2.配置Kafka
- config/server.properties
cd config/
vim server.properties
## 需要修改的项
listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/elk/kafka_2.13-2.8.0/data # 需要先创建data目录,用于实现数据和日志文件的分离
## 需要修改的项
- 启动
# 先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动kafka
bin/kafka-server-start.sh config/server.properties &
- 创建topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1
- 查看topic信息
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic app-log-collector --describe
注意:上述命令的 bin 不是必须的,主要依据执行命令时所处的文件夹路径而定。
FileBeat安装配置
1.下载并解压文件
2.修改filebeat.yml配置文件,需要注意缩进问题
filebeat.inputs:
- input_type: log
paths:
##app-服务名称.log,为什么写死,防止发生轮转抓取历史数据
- /usr/elk/logs/app-collector.log ##定义写入ES时的type值
##此处的log文件位置需要注意,是web.jar产生的文件的名称和所在的位置
document_type: "app-log"
multiline:
#pattern:'^\s*(\d{4}l\d{2})\-(\d{2}l[a-zA-Z]{3})\-(\d{2}l\d{4})
pattern: '^\[' # 指定匹配的表达式
negate: true # 是否匹配到
match: after #合并到上一行的末尾
timeout: 2s #如果在规定时间没有新的日志事件就不等待后面的日志
max_lines: 2000
fields:
logbiz: collector
logtopic: app-log-collector ##按服务划分用作kafka topic evn:dev
evn: dev
- input_type: log
paths:
- /usr/elk/logs/error-collector.log
##此处的log文件位置需要注意,是web.jar产生的文件的名称和所在的位置
document_type: "error-log"
multiline:
#pattern:1\s*(\d{4}l\d{2})\-(\d{2}l[a-zA-Z]{3})\-(\d{2}l\d{4})'#指定匹配的表达式(匹
pattern: '^\[' #指定匹配的表达式(匹配以”(开头的字符串)
negate: true #是否匹配到
match: after #合并到上一行的末尾
max_lines: 2000 #最大的行数
timeout: 2s #如果在规定时间没有新的日志事件就不等待后面的
fields:
logbiz: collector
logtopic: error-log-collector ##按服务划分用作kafka topic
evn: dev
output.kafka:
enabled: true
hosts: ["127.0.0.1:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
3.启动FileBeat
./filebeat &
Logstash安装配置
1.下载并解压文件
2.修改config下的配置文件
- config下配置文件说明:
- logstash配置文件:/config/logstash.yml
- JVM参数文件:/config/jvm.options
- 日志格式配置文件:log4j2.properties
- 制作Linux服务参数:/config/startup.options
修改logstash.yml,增加workers工作线程数 可以有效的提升logstash性能
pipeline.workers: 16
3.logstash根目录下创建script文件夹,创建logstash-script.conf文件
mkdir script
cd script
vim logstash-script.conf
logstash-script.conf 文件内容
##multiline插件也可以用于其他类似的堆栈式信息,比如linux的内核日志。
input{
kafka{
##app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "127.0.0.1:9092"
codec => json
consumer_threads => 4 ##增加consumer的并行消费线程数decorate events =>true
decorate_events => true
#auto_offset_rest =>"latest"
group_id => "app-log-group"
}
kafka{
##error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "127.0.0.1:9092"
codec => json
consumer_threads => 4
decorate_events => true
#auto_offset_rest =>"latest"
group_id => "error-log-group"
}
}
filter{
#时区转换
ruby{
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
if "app-log" in [fields][logtopic]{
grok{
##表达式
match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in[fields][logtopic]{
grok{
##表达式
match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
##测试输出到控制台:
output{
stdout { codec => rubydebug }
}
##elasticsearch:
output{
if "app-log" in [fields][logtopic]{
##es插件
elasticsearch{
#es服务地址
hosts => ["127.0.0.1:9200"]
#用户名密码
#user => "elastic"
#password => "123456"
## 索引名,+号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2019.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:一般设置true;http://10.130.159.166:9200/_nodes/http?pretty
# 通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
#1ogstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["127.0.0.1:9200"]
#user => "elastic"
#password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing=>true
template_overwrite => true
}
}
}
4.启动Logstash
bin/logstash -f /script/logstash-script.conf &
ElasticSearch安装配置
1.下载并解压文件
2.修改 config/elasticsearch.yml 配置文件
cluster.name: ccodle
node.name: es-node-1 ## es-node-2 es-node-3 不同节点名称不同
path.data: /usr/elk/elasticsearch/data ## es数据存放位置
path.logs: /usr/elk/elasticsearch/logs ## es日志存放位置
# bootstrap.memory_lock: true ## 锁内存,强制占用(类似oracle的锁内存)保证es启动正常
network.host: 0.0.0.0 ## 绑定所有ip
discovery.seed_hosts: ["127.0.0.1"]
cluster.initial_master_nodes: ["node-1"]
# 用于elasticsearch-head插件的访问
http.cors.enabled: true #表示是否支持跨域,默认为false
http.cors.allow-origin: "*" #当设置允许跨域,默认为*,表示支持所有域名
- 用非 root 用户启动elasticsearch,启动可能会报线程限制的错误,需要修改 /etc/security/limits.conf
# 文件末尾追加
* soft nofile 65536
* hard nofile 131072
* soft nproc 4096
* hard nproc 4096
# 启动命令
bin/elasticsearch &
4.记得安装ik分词器,此处不再给出步骤
Kibana安装配置
1.下载并解压文件
2.修改 config/kibana.yml 配置文件
server.host: "0.0.0.0"
server.name: "127.0.0.1"
elasticsearch.hosts: ["http://127.0.0.1:9200"]
3.启动Kibana
bin/kibana &
启动web.jar
java -jar web.jar
没有意外的话,登录Kibana的界面(localhost:5601)就能看到相应的日志数据了。
网友评论