美文网首页
SpringBoot整合kafka

SpringBoot整合kafka

作者: huan1993 | 来源:发表于2021-01-31 19:36 被阅读0次

    一、背景

    此处简单记录一下 SpringBootKafka 的整合。

    二、实现步骤

    1、引入jar包

    <dependency>
       <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    2、编写生产者和消费者的配置

    3、生产者配置

    spring.application.name=kafka-springboot
    # 配置 kafka 服务器的地址,多个以逗号隔开
    spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
    # 生产者配置
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.acks=1
    spring.kafka.producer.retries=0
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    

    4、消费者配置

    # 消费者配置
    # 关闭自动提交 ack
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-commit-interval=100
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.max-poll-records=500
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    # 配置监听手动提交 ack ,消费一条数据完后,立即提交
    spring.kafka.listener.ack-mode=manual_immediate
    # 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交
    #spring.kafka.listener.ack-mode=manual
    spring.kafka.listener.poll-timeout=500S
    

    5、消费者手动提交 ack

    1、spring.kafka.consumer.enable-auto-commit 修改成 false
    2、spring.kafka.listener.ack-mode 修改成
                |- manual: 表示手动提交,但是测试下来发现是批量提交
                |- manual_immediate: 表示手动提交,当调用 Acknowledgment#acknowledge之后立马提交。

    3、编写生产者代码

    @Component
    public class KafkaProducer implements CommandLineRunner {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Override
        public void run(String... args) {
            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() ->
                    {
                        kafkaTemplate.send(KafkaConstant.TOPIC, String.valueOf(System.currentTimeMillis()))
                                .addCallback(new SuccessCallback<SendResult<String, String>>() {
                                    @Override
                                    public void onSuccess(SendResult<String, String> result) {
                                        if (null != result.getRecordMetadata()) {
                                            System.out.println("消费发送成功 offset:" + result.getRecordMetadata().offset());
                                            return;
                                        }
                                        System.out.println("消息发送成功");
                                    }
                                }, new FailureCallback() {
                                    @Override
                                    public void onFailure(Throwable throwable) {
                                        System.out.println("消费发送失败:" + throwable.getMessage());
                                    }
                                });
                    },
                    0, 1, TimeUnit.SECONDS);
        }
    }
    
    

    1、消费的发送使用KafkaTemplate
    2、根据发送的结果知道,消息发送成功还是失败。

    4、编写消费者代码

    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = KafkaConstant.TOPIC, groupId = "kafka-springboot-001")
        public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
            System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接收到kafka消息,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
            TimeUnit.SECONDS.sleep(1);
            ack.acknowledge();
        }
    }
    

    KafkaListener:
          topic: 表示需要监听的队列名称
          groupId: 表示消费者组的id

    三、运行结果

    运行结果

    四、参考文档

    1、https://docs.spring.io/spring-boot/docs/2.4.2/reference/htmlsingle/#boot-features-kafka

    五、代码路径

    https://gitee.com/huan1993/rabbitmq/tree/master/kafka-springboot/src/main/java/com/huan/study/kafka

    相关文章

      网友评论

          本文标题:SpringBoot整合kafka

          本文链接:https://www.haomeiwen.com/subject/gzwrtltx.html