美文网首页
Spring Boot集成Kafka + ELK

Spring Boot集成Kafka + ELK

作者: 随风摇摆水杉树 | 来源:发表于2021-05-02 16:03 被阅读0次

    一、Kafka和ELK

    • Kafka
      Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
    • ELK
      ELK是ElasticSearch+Logstash+Kibana的简称。
      1、ElasticSearch
      Elasticsearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。Elasticsearch也具有数据存储能力。
      2、Logstash
      Logstash是一个具有实时流处理能力的开源数据收集引擎。它可以动态聚合来自不同来源的数据,并将数据过滤解析后存储到目标数据库,通常是Elasticsearch。这种格式化的数据便于下游产品进行数据分析和可视化。
      basic_logstash_pipeline.png
      其中inputoutput是必须的,filter是可选的。
      3、Kibana
      Kibana 是一个免费且开放的用户界面,能够让您对 Elasticsearch 数据进行可视化,并让您在 Elastic Stack 中进行导航。您可以进行各种操作,从跟踪查询负载,到理解请求如何流经您的整个应用,都能轻松完成。

    二、流程图

    流程图.png

    三、准备工作

    • 一台ELK服务器
      我这边直接使用工作电脑作为ELK服务器,操作系统Windows(当然也可以是Linux)。
    • 一台Kafka服务器
      设置IP地址为192.168.0.107,操作系统Ubuntu Server。

    在各服务器上安装好各种软件,具体安装可以参考官方教程。

    四、启动服务

    Kafka
    1、启动zookeeper
    我这边是单服务,所以不修改配置直接使用默认配置:

    qian@qian:~/kafka/kafka_2.13-2.8.0$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
    

    2、启动kafka
    首先配置server.properties,添加如下内容:

    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from 
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    #listeners=PLAINTEXT://:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    advertised.listeners=PLAINTEXT://192.168.0.107:9092
    

    advertised.listeners=PLAINTEXT://192.168.0.107:9092这个配置可以让局域网内的其它服务器访问kafka服务。
    然后启动kafka服务:

    qian@qian:~/kafka/kafka_2.13-2.8.0$ ./bin/kafka-server-start.sh ./config/server.properties
    

    ElasticSearch
    由于我安装在Windows上,所以直接双击elasticsearch.bat启动,服务访问地址为http://localhost:9200/,如果想在浏览器中访问需要安装一个elasticsearch-head插件。
    Logstash
    在启动Logstash服务前,首先简单认识一下grok(官方教程),它是一个可以解析任何文本并且将文本结构化的一个工具。它可以高效处理syslog logs,apache logs,webserver logs,mysql logs。简单来说它就是一个内置了很多日志元素正则表达式的工具。比如,日志中有一条xxxxxx 192.168.0.1 xxxx这样的信息,我们要把IP地址提取出来,然后可以这样写正则表达式:

    (?<clientIP>(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]))
    

    总之表达式很长,而grok可以这样写:

    %{IP:clientIP}
    

    其中IP就是对上面正则表达式的引用,clientIP就是捕获到的ip地址。Logstash内置了很多像IP这样的正则表达式模板,具体请查看https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
    现在开始启动Logstash,首先编写一份logstash-elk.conf配置文件,内容如下:

    input {
      kafka {
        id => "spring_kafka_elk"
        bootstrap_servers => "192.168.0.107:9092"
        topics => ["kafka-elk-logg"]
        auto_offset_reset => "latest" 
      }
    }
    filter {
    
        grok {
          patterns_dir => ["./patterns"]
          match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{GREEDYDATA:thread}\] %{LOGLEVEL:level}\s+%{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" }
        }
    
        date {
          match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss.SSS" ]
        }
        
    }
    output {
      stdout { codec => rubydebug }
      elasticsearch {
        hosts =>["localhost:9200"]
      }
    }
    

    inputoutput比较明显,就是日志的来源地和输出地,filter包含了grokdate这两个元素。

    • grok
      patterns_dir表示自定义的匹配模板文件,不过这里并没有使用到。
      match就是我们需要如何解析日志,最终会解析成timestampthreadlevelclasslogmessage。比如有这样一条日志2021-05-02 15:01:01 [main] DEBUG com.test.Hello - hello word,那么解析后结果如下:
      timestamp = 2021-05-02 15:01:01
      thread = main
      level = DEBUG
      class=com.test.Hello
      logmessage=hello word
    • date
      主要是告诉系统日志中的时间格式是yyyy-MM-dd HH:mm:ss.SSS

    在CMD中启动Logstash服务:

    logstash-7.12.1\bin>logstash.bat -f E:\dev-tool\elasticsearch\logstash-7.12.1\config\logstash-elk.conf
    

    Kibana
    由于ELK服务都装在了windows上,所以直接使用默认配置,双击kibana.bat启动,服务访问地址为http://localhost:5601/

    注意:以上各个路径需要改成自己的安装目录

    四、Spring Boot配置

    pom.xml

    ...
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
          <exclusions>
            <exclusion>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-log4j-appender</artifactId>
          <version>2.8.0</version>
        </dependency>
    ...
    

    log4j2.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN">
      <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
          <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <Kafka name="Kafka" topic="kafka-elk-logg">
          <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
          <Property name="bootstrap.servers">192.168.0.107:9092</Property>
        </Kafka>
      </Appenders>
      <Loggers>
        <Logger name="org.springframework" level="debug" additivity="false">
          <AppenderRef ref="Console"/>
        </Logger>
        <Logger name="com.drw.start" level="debug" additivity="false">
          <AppenderRef ref="Kafka"/>
        </Logger>
        <Root level="INFO">
          <AppenderRef ref="Console"/>
        </Root>
      </Loggers>
    </Configuration>
    

    通过配置我们将部分log4j2的日志写入到了Kafka。
    LogController

    package com.drw.start.kafka.elk.controller;
    
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    
    @Controller
    public class LogController {
    
        private static Logger log = LogManager.getLogger(LogController.class);
    
        @RequestMapping("log")
        public String logtest() {
            log.debug("Spring Boot集成Kafka和ELK");
            return "sucess";
        }
    }
    

    启动后直接在浏览器中访问http://localhost:8080/log,然后出现了错误,但是不要紧,这个只是找不到相关页面而已,我们最主要的是将日志写入到log4j2。

    五、查看Kibana

    kibana效果图.png

    我们之前配置的logstash成功将日志进行解析,kibana将这些信息作为列表项展示了出来

    六、参考资源

    http://kafka.apache.org/
    https://www.elastic.co/guide/index.html
    https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
    https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

    相关文章

      网友评论

          本文标题:Spring Boot集成Kafka + ELK

          本文链接:https://www.haomeiwen.com/subject/dtetdltx.html