一、Kafka和ELK
-
Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 -
ELK
ELK是ElasticSearch+Logstash+Kibana的简称。
1、ElasticSearch
Elasticsearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。Elasticsearch也具有数据存储能力。
2、Logstash
Logstash是一个具有实时流处理能力的开源数据收集引擎。它可以动态聚合来自不同来源的数据,并将数据过滤解析后存储到目标数据库,通常是Elasticsearch。这种格式化的数据便于下游产品进行数据分析和可视化。
basic_logstash_pipeline.png
其中input
和output
是必须的,filter
是可选的。
3、Kibana
Kibana 是一个免费且开放的用户界面,能够让您对 Elasticsearch 数据进行可视化,并让您在 Elastic Stack 中进行导航。您可以进行各种操作,从跟踪查询负载,到理解请求如何流经您的整个应用,都能轻松完成。
二、流程图
流程图.png三、准备工作
-
一台ELK服务器
我这边直接使用工作电脑作为ELK服务器,操作系统Windows(当然也可以是Linux)。 -
一台Kafka服务器
设置IP地址为192.168.0.107
,操作系统Ubuntu Server。
在各服务器上安装好各种软件,具体安装可以参考官方教程。
四、启动服务
Kafka
1、启动zookeeper
我这边是单服务,所以不修改配置直接使用默认配置:
qian@qian:~/kafka/kafka_2.13-2.8.0$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
2、启动kafka
首先配置server.properties
,添加如下内容:
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://192.168.0.107:9092
advertised.listeners=PLAINTEXT://192.168.0.107:9092
这个配置可以让局域网内的其它服务器访问kafka服务。
然后启动kafka服务:
qian@qian:~/kafka/kafka_2.13-2.8.0$ ./bin/kafka-server-start.sh ./config/server.properties
ElasticSearch
由于我安装在Windows上,所以直接双击elasticsearch.bat
启动,服务访问地址为http://localhost:9200/
,如果想在浏览器中访问需要安装一个elasticsearch-head
插件。
Logstash
在启动Logstash服务前,首先简单认识一下grok(官方教程),它是一个可以解析任何文本并且将文本结构化的一个工具。它可以高效处理syslog logs,apache logs,webserver logs,mysql logs。简单来说它就是一个内置了很多日志元素正则表达式的工具。比如,日志中有一条xxxxxx 192.168.0.1 xxxx
这样的信息,我们要把IP地址提取出来,然后可以这样写正则表达式:
(?<clientIP>(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]))
总之表达式很长,而grok
可以这样写:
%{IP:clientIP}
其中IP
就是对上面正则表达式的引用,clientIP
就是捕获到的ip地址。Logstash内置了很多像IP
这样的正则表达式模板,具体请查看https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。
现在开始启动Logstash,首先编写一份logstash-elk.conf
配置文件,内容如下:
input {
kafka {
id => "spring_kafka_elk"
bootstrap_servers => "192.168.0.107:9092"
topics => ["kafka-elk-logg"]
auto_offset_reset => "latest"
}
}
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{GREEDYDATA:thread}\] %{LOGLEVEL:level}\s+%{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" }
}
date {
match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss.SSS" ]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts =>["localhost:9200"]
}
}
input
和output
比较明显,就是日志的来源地和输出地,filter
包含了grok
和date
这两个元素。
-
grok
patterns_dir
表示自定义的匹配模板文件,不过这里并没有使用到。
match
就是我们需要如何解析日志,最终会解析成timestamp
、thread
、level
、class
、logmessage
。比如有这样一条日志2021-05-02 15:01:01 [main] DEBUG com.test.Hello - hello word
,那么解析后结果如下:
timestamp = 2021-05-02 15:01:01
thread = main
level = DEBUG
class=com.test.Hello
logmessage=hello word
-
date
主要是告诉系统日志中的时间格式是yyyy-MM-dd HH:mm:ss.SSS
。
在CMD中启动Logstash服务:
logstash-7.12.1\bin>logstash.bat -f E:\dev-tool\elasticsearch\logstash-7.12.1\config\logstash-elk.conf
Kibana
由于ELK服务都装在了windows上,所以直接使用默认配置,双击kibana.bat
启动,服务访问地址为http://localhost:5601/
。
注意:以上各个路径需要改成自己的安装目录
四、Spring Boot配置
pom.xml
...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>2.8.0</version>
</dependency>
...
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<Kafka name="Kafka" topic="kafka-elk-logg">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
<Property name="bootstrap.servers">192.168.0.107:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Logger name="org.springframework" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="com.drw.start" level="debug" additivity="false">
<AppenderRef ref="Kafka"/>
</Logger>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
通过配置我们将部分log4j2的日志写入到了Kafka。
LogController
package com.drw.start.kafka.elk.controller;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class LogController {
private static Logger log = LogManager.getLogger(LogController.class);
@RequestMapping("log")
public String logtest() {
log.debug("Spring Boot集成Kafka和ELK");
return "sucess";
}
}
启动后直接在浏览器中访问http://localhost:8080/log
,然后出现了错误,但是不要紧,这个只是找不到相关页面而已,我们最主要的是将日志写入到log4j2。
五、查看Kibana
kibana效果图.png我们之前配置的logstash成功将日志进行解析,kibana将这些信息作为列表项展示了出来
六、参考资源
http://kafka.apache.org/
https://www.elastic.co/guide/index.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
网友评论