美文网首页程序员
Kafka(2)SpringBoot2整合Kafka

Kafka(2)SpringBoot2整合Kafka

作者: 正义的杰克船长 | 来源:发表于2020-08-18 00:48 被阅读0次

    1 前期准备

    • 安装并启动Zookeeper服务。
    • 安装并启动Kafka服务(可参考前一篇文章安装Kafka运行环境)。
    • 本文采用的开发工具为IDEA,版本为Spring-Boot-2.3.0.RELEASE、Kafka-clients-2.5.0、JDK8。
    • Spring-kafka是在kafka-clients基础上开发封装的项目,所以选择版本需要注意兼容对应,以下是Spring官方给出的版本兼容表。
    版本兼容表

    2 创建项目

    • 使用项目构建工具Maven,创建项目名为spring-kafka的父项目,包括两个子模块,一个生产端模块spring-kafka-producer,一个消费端模块spring-kafka-consumer。项目整体目录结构如下图所示。
    项目整体目录结构

    3 添加项目依赖

    项目依赖pom.xml文件如下:

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.0.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <!-- kafka 依赖-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.5.0</version>
            </dependency>
            <!-- spring boot 依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- json 工具包 -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.68</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    

    4 Kafka消息生产端

    • 新建生产端spring配置文件application.yml。
    server:
      port: 8081
    
    spring:
      kafka:
        producer:
          # 生产客户端id,默认值为""
          client-id: 1
          # 连接的broker地址,如有多个用逗号隔开
          bootstrap-servers: localhost:9092
          # key序列化类,可以自定义序列化(broker端接受的消息必须以字节数组的形式)
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # value序列化类,可以自定义序列化
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 重试次数(提高可靠性,会影响同步性能,需要等待上一条消息发送完成后才发送下一条)
          retries: 1
    
    • 新建生产端启动类KafkaProducerApplication.java。
    package com.johnny.september.kafka.producer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * 启动类
     * @author Johnny Lu
     */
    @SpringBootApplication
    public class KafkaProducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaProducerApplication.class, args);
        }
    }
    
    • 新建消息实体类Message.java。
    package com.johnny.september.kafka.producer.model;
    
    import java.io.Serializable;
    
    /**
     * 消息实体
     * @author Johnny Lu
     */
    public class Message implements Serializable {
    
        private static final long serialVersionUID = -118L;
    
        /** 内容 */
        private String content;
    
        public Message() {
        }
    
        public Message(String content) {
            this.content = content;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
    }
    
    • 新建Kafka配置类KafkaConfig.java。
    package com.johnny.september.kafka.producer.config;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.TopicBuilder;
    
    /**
     * Kafka配置类
     * @author Johnny Lu
     */
    @Configuration
    public class KafkaConfig {
    
        /**
         * 创建topic,指定主题名称,分区数量,副本数量
         *
         * @return
         */
        @Bean
        public NewTopic topicTest() {
            return TopicBuilder.name("topic_test_1").partitions(3).replicas(1).build();
        }
    }
    
    • 新建消息生产控制器类MessageController.java。
    package com.johnny.september.kafka.producer.controller;
    
    import com.alibaba.fastjson.JSON;
    import com.johnny.september.kafka.producer.model.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 控制器
     * @author Johnny Lu
     */
    @RestController
    public class MessageController {
        private static final Logger logger = LoggerFactory.getLogger(MessageController.class);
    
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        /**
         * 发送消息
         *
         * @param content
         */
        @RequestMapping(path = "/send/{content}")
        public void sendMessage(@PathVariable String content) {
            kafkaTemplate.send("topic_test_1", JSON.toJSONString(new Message(content)));
        }
    
        /**
         * 发送消息,且阻塞等待broker的响应,直到消息发送成功,设置超时时间,超时异常处理
         *
         * @param content
         * @return
         */
        @RequestMapping(path = "/sendWaitResult/{content}")
        public String sendMessageWaitResult(@PathVariable String content) {
            String result = "发送成功";
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate
                    .send("topic_test_1", JSON.toJSONString(new Message(content)));
            try {
                future.get(3000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                result = "发送失败";
            } catch (ExecutionException e) {
                e.printStackTrace();
                result = "执行失败";
            } catch (TimeoutException e) {
                e.printStackTrace();
                result = "发送超时";
            }
            logger.info("发送消息:{}, 结果:{}", content, result);
            return result;
        }
    }
    

    5 Kafka消息消费端

    • 新建消费端的spring配置文件application.yml。
    server:
      port: 8082
    
    spring:
      kafka:
        consumer:
          # 消费客户端id,默认值为""
          client-id: 1
          # 连接的broker地址,如有多个用逗号隔开
          bootstrap-servers: localhost:9092
          # key反序列化类
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # value反序列化类
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    • 新建消费端启动类KafkaConsumerApplication.java。
    package com.johnny.september.kafka.consumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * 启动类
     * @author Johnny Lu
     */
    @SpringBootApplication
    public class KafkaConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaConsumerApplication.class, args);
        }
    }
    
    • 新建消息监听类MessageListener.java。
    package com.johnny.september.kafka.consumer.listener;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息监听
     *
     * @author Johnny Lu
     */
    @Component
    public class MessageListener {
    
        private final Logger logger = LoggerFactory.getLogger(MessageListener.class);
    
        /**
         * 监听消息,接受消息后处理业务逻辑
         * 消费组:messageGroup
         *
         * @param message
         */
        @KafkaListener(id = "messageGroup", topics = "topic_test_1")
        public void listen(String message) {
            logger.info("接受消息: " + message);
        }
    }
    

    6 启动项目验证

    • Run KafkaProducerApplication.java类的main方法。
    • Run KafkaConsumerApplication.java类的main方法。
    • 打开浏览器,执行发送消息请求http://localhost:8081/send/kafka,然后在消费端的控制台会打印接收到的消息,如下图所示。
      消费端控制台
      能发送消息和完整的接收消息了,表示整合初步OK了。

    7 自定义拦截器

    Kafka共有两种拦截器:生产者拦截器和消费者拦截器。

    生产者拦截器

    • 生产者拦截器可以用来在消息发送前做一些准备工作,比如日志打印、不符合条件的消息过滤等,也可以在回调逻辑前做一些数据统计等工作。
    • 自定义拦截器主要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。这里新建一个自定义拦截器CustomProducerInterceptor.java,具体代码如下。
    package com.johnny.september.kafka.producer.config;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    /**
     * 自定义生产端拦截器
     * @author Johnny Lu
     */
    public class CustomProducerInterceptor implements ProducerInterceptor {
    
        private static final Logger logger = LoggerFactory.getLogger(CustomProducerInterceptor.class);
    
        /**
         * 发送前做一些处理
         *
         * @param record
         * @return
         */
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            logger.info("发送消息 :{}", record.toString());
            return record;
        }
    
        /**
         * 这个方法在应答前或消息发送失败时被调用
         *
         * @param metadata
         * @param exception
         */
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        }
    
        /**
         * 关闭这个拦截器时被调用
         */
        @Override
        public void close() {
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
        }
    }
    
    • 自定义拦截器后,需要进行配置 interceptor.classes才能生效。支持多个拦截器,用逗号隔开,多个拦截器会形成拦截链,按配置的顺序一一调用。在application.yml文件增加如下配置。
    spring:
      kafka:
        producer:
          properties:
            interceptor.classes: com.johnny.september.kafka.producer.config.CustomProducerInterceptor
    

    消费者拦截器

    • 消费者拦截器主要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。这里新建一个自定义拦截器CustomConsumerInterceptor.java,具体代码如下。
    package com.johnny.september.kafka.consumer.config;
    
    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    
    import java.util.Map;
    
    /**
     * 自定义消费端拦截器
     * @author Johnny Lu
     */
    public class CustomConsumerInterceptor implements ConsumerInterceptor {
    
        /**
         * 这个方法,在拉取到消息调用
         * @param records
         * @return
         */
        @Override
        public ConsumerRecords onConsume(ConsumerRecords records) {
            return records;
        }
    
        /**
         * 这个方法,在提交请求响应成功时被调用
         * @param offsets
         */
        @Override
        public void onCommit(Map offsets) {
        }
    
        @Override
        public void close() {
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    
    • 自定义拦截器后,也需要进行配置 interceptor.classes才能生效。在application.yml文件增加如下配置。
    spring:
      kafka:
        consumer:
          properties:
            # 支持多个拦截器,用逗号隔开,多个形成拦截链,按顺序一一调用
            interceptor.classes: com.johnny.september.kafka.consumer.config.CustomConsumerInterceptor
    

    8 自定义分区器

    • 生产者发送消息可能需要经过生产者拦截器、序列化器、分区器一系列过程后才会发往broker。拦截器一般是非必须的,序列化器是必须的。如果消息发送参数指定了分区partition字段,就不要分区器。如果消息发送参数没有指定partition字段,那么需要分区器为消息分配发往的分区。
    • 默认分区器org.apache.kafka.clients.producer.internals.DefaultPartitioner。
    • 自定义分区器主要实现org.apache.kafka.clients.producer.Partitioner接口。这里新建一个自定义拦截器CustomPartitioner.java,具体代码如下。
    package com.johnny.september.kafka.producer.config;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.utils.Utils;
    
    import java.util.Map;
    import java.util.concurrent.ThreadLocalRandom;
    
    /**
     * 自定义分区器
     * @author Johnny Lu
     */
    public class CustomPartitioner implements Partitioner {
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                Cluster cluster) {
            Integer partitionNums = cluster.partitionCountForTopic(topic);
            if (keyBytes == null) {
                // 随机分区
                return Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNums;
            } else {
                // 保持和 DefaultPartitioner 一样采用murmur2算法分区
                return Utils.toPositive(Utils.murmur2(keyBytes)) % partitionNums;
            }
        }
    
        @Override
        public void close() {
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
        }
    }
    
    
    • 自定义分区器后,需要进行配置partitioner.class才能生效。在生产端application.yml文件增加如下配置。
    spring:
      kafka:
        producer:
          properties:
            partitioner.class: com.johnny.september.kafka.producer.config.CustomPartitioner
    
    

    重新启动项目进行验证。

    9 结语

    到这里,Spring Boot2整合Kafka简易版完成了。以后会继续记录Kafka其他功能及用法。

    相关文章

      网友评论

        本文标题:Kafka(2)SpringBoot2整合Kafka

        本文链接:https://www.haomeiwen.com/subject/isscjktx.html