美文网首页
Spring Boot集成Kafka + ELK

Spring Boot集成Kafka + ELK

作者: 随风摇摆水杉树 | 来源:发表于2021-05-02 16:03 被阅读0次

一、Kafka和ELK

  • Kafka
    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
  • ELK
    ELK是ElasticSearch+Logstash+Kibana的简称。
    1、ElasticSearch
    Elasticsearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。Elasticsearch也具有数据存储能力。
    2、Logstash
    Logstash是一个具有实时流处理能力的开源数据收集引擎。它可以动态聚合来自不同来源的数据,并将数据过滤解析后存储到目标数据库,通常是Elasticsearch。这种格式化的数据便于下游产品进行数据分析和可视化。
    basic_logstash_pipeline.png
    其中inputoutput是必须的,filter是可选的。
    3、Kibana
    Kibana 是一个免费且开放的用户界面,能够让您对 Elasticsearch 数据进行可视化,并让您在 Elastic Stack 中进行导航。您可以进行各种操作,从跟踪查询负载,到理解请求如何流经您的整个应用,都能轻松完成。

二、流程图

流程图.png

三、准备工作

  • 一台ELK服务器
    我这边直接使用工作电脑作为ELK服务器,操作系统Windows(当然也可以是Linux)。
  • 一台Kafka服务器
    设置IP地址为192.168.0.107,操作系统Ubuntu Server。

在各服务器上安装好各种软件,具体安装可以参考官方教程。

四、启动服务

Kafka
1、启动zookeeper
我这边是单服务,所以不修改配置直接使用默认配置:

qian@qian:~/kafka/kafka_2.13-2.8.0$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties

2、启动kafka
首先配置server.properties,添加如下内容:

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://192.168.0.107:9092

advertised.listeners=PLAINTEXT://192.168.0.107:9092这个配置可以让局域网内的其它服务器访问kafka服务。
然后启动kafka服务:

qian@qian:~/kafka/kafka_2.13-2.8.0$ ./bin/kafka-server-start.sh ./config/server.properties

ElasticSearch
由于我安装在Windows上,所以直接双击elasticsearch.bat启动,服务访问地址为http://localhost:9200/,如果想在浏览器中访问需要安装一个elasticsearch-head插件。
Logstash
在启动Logstash服务前,首先简单认识一下grok(官方教程),它是一个可以解析任何文本并且将文本结构化的一个工具。它可以高效处理syslog logs,apache logs,webserver logs,mysql logs。简单来说它就是一个内置了很多日志元素正则表达式的工具。比如,日志中有一条xxxxxx 192.168.0.1 xxxx这样的信息,我们要把IP地址提取出来,然后可以这样写正则表达式:

(?<clientIP>(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]))

总之表达式很长,而grok可以这样写:

%{IP:clientIP}

其中IP就是对上面正则表达式的引用,clientIP就是捕获到的ip地址。Logstash内置了很多像IP这样的正则表达式模板,具体请查看https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
现在开始启动Logstash,首先编写一份logstash-elk.conf配置文件,内容如下:

input {
  kafka {
    id => "spring_kafka_elk"
    bootstrap_servers => "192.168.0.107:9092"
    topics => ["kafka-elk-logg"]
    auto_offset_reset => "latest" 
  }
}
filter {

    grok {
      patterns_dir => ["./patterns"]
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{GREEDYDATA:thread}\] %{LOGLEVEL:level}\s+%{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" }
    }

    date {
      match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss.SSS" ]
    }
    
}
output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts =>["localhost:9200"]
  }
}

inputoutput比较明显,就是日志的来源地和输出地,filter包含了grokdate这两个元素。

  • grok
    patterns_dir表示自定义的匹配模板文件,不过这里并没有使用到。
    match就是我们需要如何解析日志,最终会解析成timestampthreadlevelclasslogmessage。比如有这样一条日志2021-05-02 15:01:01 [main] DEBUG com.test.Hello - hello word,那么解析后结果如下:
    timestamp = 2021-05-02 15:01:01
    thread = main
    level = DEBUG
    class=com.test.Hello
    logmessage=hello word
  • date
    主要是告诉系统日志中的时间格式是yyyy-MM-dd HH:mm:ss.SSS

在CMD中启动Logstash服务:

logstash-7.12.1\bin>logstash.bat -f E:\dev-tool\elasticsearch\logstash-7.12.1\config\logstash-elk.conf

Kibana
由于ELK服务都装在了windows上,所以直接使用默认配置,双击kibana.bat启动,服务访问地址为http://localhost:5601/

注意:以上各个路径需要改成自己的安装目录

四、Spring Boot配置

pom.xml

...
 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-log4j-appender</artifactId>
      <version>2.8.0</version>
    </dependency>
...

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
  <Appenders>
    <Console name="Console" target="SYSTEM_OUT">
      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
    <Kafka name="Kafka" topic="kafka-elk-logg">
      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
      <Property name="bootstrap.servers">192.168.0.107:9092</Property>
    </Kafka>
  </Appenders>
  <Loggers>
    <Logger name="org.springframework" level="debug" additivity="false">
      <AppenderRef ref="Console"/>
    </Logger>
    <Logger name="com.drw.start" level="debug" additivity="false">
      <AppenderRef ref="Kafka"/>
    </Logger>
    <Root level="INFO">
      <AppenderRef ref="Console"/>
    </Root>
  </Loggers>
</Configuration>

通过配置我们将部分log4j2的日志写入到了Kafka。
LogController

package com.drw.start.kafka.elk.controller;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
public class LogController {

    private static Logger log = LogManager.getLogger(LogController.class);

    @RequestMapping("log")
    public String logtest() {
        log.debug("Spring Boot集成Kafka和ELK");
        return "sucess";
    }
}

启动后直接在浏览器中访问http://localhost:8080/log,然后出现了错误,但是不要紧,这个只是找不到相关页面而已,我们最主要的是将日志写入到log4j2。

五、查看Kibana

kibana效果图.png

我们之前配置的logstash成功将日志进行解析,kibana将这些信息作为列表项展示了出来

六、参考资源

http://kafka.apache.org/
https://www.elastic.co/guide/index.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

相关文章

网友评论

      本文标题:Spring Boot集成Kafka + ELK

      本文链接:https://www.haomeiwen.com/subject/dtetdltx.html