美文网首页
spring-kafka自动提交及批量消费

spring-kafka自动提交及批量消费

作者: heichong | 来源:发表于2023-02-20 13:38 被阅读0次

    一、consumer简单示例

    @Component
    @Slf4j
    public class TestConsumer {
    
        public static final String TOPIC = "my-topic" ;
    
        /**
         * 对产品订购消息进行消费处理
         * @param record
         */
        @KafkaListener(groupId = "test", topics = TOPIC
                , properties = {
                ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=110000"
                ,ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=10"
        })
        public void test(ConsumerRecord<String, String> record) {
    
            log.info("start 开始处理消息,参数 record = {}", recordList);
            //todo something
            log.info(" end 处理消息完成");
        }
    }
    

    @KafkaListener.containerFactory

    @KafkaListener 可以指定containerFactory 属性,不指定的话,默认使用名为kafkaListenerContainerFactory的容器工厂,由KafkaAnnotationDrivenConfiguration类自动创建;如果指定的话,则需要自己创建,并指定bean name

    • 默认kafkaListenerContainerFactory
      默认清空下kafkaListenerContainerFactory.batchListener=null,非批量处理的意思。即@KafkaListener修饰的方法的参数是单个记录传递的,如public void test(ConsumerRecord<String, String> record),即使你修改为public void test(ConsumerRecord<String, String> recordList),list中也只有一条记录。
      要想批量处理,可以自定义containerFactory

    • 自定义 containerFactory
      先自定义KafkaListenerContainerFactory

    
        @Bean
        public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<Integer,String > consumerFactory) {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            // 调用监听者时,传递批量数据
            factory.setBatchListener(true);
            // 设置每个@kafkaListener的线程数
            factory.setConcurrency(3);
            return factory;
        }
    

    修改消息监听代码

    @Component
    @Slf4j
    public class TestConsumer {
    
        public static final String TOPIC = "my-topic" ;
    
        /**
         * 对产品订购消息进行消费处理
         * @param recordList
         */
        @KafkaListener(groupId = "test", topics = TOPIC, containerFactory = "batchFactory"
                , properties = {
                ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=110000"
                ,ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=10"
        })
        public void test(List<ConsumerRecord<String, String>> recordList) {
    
            log.info("start 开始处理消息,参数 record = {}", recordList);
            //todo something
            log.info(" end 处理消息完成");
        }
    }
    

    这时,recordList中就会有多条记录,当然recordList的值取决于MAX_POLL_RECORDS的配置
    如果把public void test(ConsumerRecord<String, String> recordList) 修改为 public void test(ConsumerRecord<String, String> record),反而会导致找到此消息监听者,导致无法消费数据

    @KafkaListener.properties

    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=10"
    

    此配置指定单次poll时,最大拉取的记录数为10,可能小于10。

    ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=110000"
    

    两次poll时的最大间隔时间,每次拉取MAX_POLL_RECORDS这些条记录,并进行处理,所有记录处理完后才会进行下一次poll。
    所以这个MAX_POLL_INTERVAL_MS时间,一定要大于(单记录处理时间*MAX_POLL_RECORDS),否则会导致rebalance, consumer commit失败。

    offset提交方式

    默认配置(spring.kafka.enable-auto-commit: false)时,consumer offset的提交操作交由spring管理,spring会在下次poll之前提交,即本次poll的所有记录处理完以后才会进行批量提交,具体可参考 KafkaMessageListenerContainer#run()方法

    • enable.auto.commit: true
      enable.auto.commit 的默认值是 true;采用自动提交的机制。此时会根据 auto.commit.interval.ms 配置的时间间隔去自动 commit, 就算 record 被消费异常也会自动 commit.
      auto.commit.interval.ms 的默认值是 5000,单位是毫秒。
      这样,默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka,或者每一次数据从指定的 Topic 取回时,将会提交最后一次的 Offset。
      如果 enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑了。

    • enable.auto.commit: false
      当 enable-auto-commit 为 false 时, 需要根据 listener 的 ack-mode 来确定确认模式.

    • ack-mode

    • record 在 listener 处理每条消息之后提交, 即处理一条提交一条.
    • batch 默认值 ,在下一次 poll 之前提交已经处理完的记录.
    • time按照时间间隔来提交, 单位为毫秒.
    • count 累积到 count 数目时提交.
    • count_time 到了时间或者到了累积的数目时提交.
    • manual 手工提交, 需要在业务代码中调用 Acknowledgment.acknowledge() 提交, 调用之后,就是跟 batch 同理处理。
    • manual_immediate 调用 Acknowledgment.acknowledge() 之后立马提交.

    相关文章

      网友评论

          本文标题:spring-kafka自动提交及批量消费

          本文链接:https://www.haomeiwen.com/subject/apngxltx.html