一、consumer简单示例
@Component
@Slf4j
public class TestConsumer {
public static final String TOPIC = "my-topic" ;
/**
* 对产品订购消息进行消费处理
* @param record
*/
@KafkaListener(groupId = "test", topics = TOPIC
, properties = {
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=110000"
,ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=10"
})
public void test(ConsumerRecord<String, String> record) {
log.info("start 开始处理消息,参数 record = {}", recordList);
//todo something
log.info(" end 处理消息完成");
}
}
@KafkaListener.containerFactory
@KafkaListener 可以指定containerFactory 属性,不指定的话,默认使用名为kafkaListenerContainerFactory
的容器工厂,由KafkaAnnotationDrivenConfiguration
类自动创建;如果指定的话,则需要自己创建,并指定bean name
-
默认kafkaListenerContainerFactory
默认清空下kafkaListenerContainerFactory.batchListener=null
,非批量处理的意思。即@KafkaListener
修饰的方法的参数是单个记录传递的,如public void test(ConsumerRecord<String, String> record)
,即使你修改为public void test(ConsumerRecord<String, String> recordList)
,list中也只有一条记录。
要想批量处理,可以自定义containerFactory -
自定义 containerFactory
先自定义KafkaListenerContainerFactory
@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<Integer,String > consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 调用监听者时,传递批量数据
factory.setBatchListener(true);
// 设置每个@kafkaListener的线程数
factory.setConcurrency(3);
return factory;
}
修改消息监听代码
@Component
@Slf4j
public class TestConsumer {
public static final String TOPIC = "my-topic" ;
/**
* 对产品订购消息进行消费处理
* @param recordList
*/
@KafkaListener(groupId = "test", topics = TOPIC, containerFactory = "batchFactory"
, properties = {
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=110000"
,ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=10"
})
public void test(List<ConsumerRecord<String, String>> recordList) {
log.info("start 开始处理消息,参数 record = {}", recordList);
//todo something
log.info(" end 处理消息完成");
}
}
这时,recordList中就会有多条记录,当然recordList的值取决于MAX_POLL_RECORDS
的配置
如果把public void test(ConsumerRecord<String, String> recordList)
修改为 public void test(ConsumerRecord<String, String> record)
,反而会导致找到此消息监听者,导致无法消费数据
@KafkaListener.properties
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=10"
此配置指定单次poll时,最大拉取的记录数为10,可能小于10。
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=110000"
两次poll时的最大间隔时间,每次拉取MAX_POLL_RECORDS
这些条记录,并进行处理,所有记录处理完后才会进行下一次poll。
所以这个MAX_POLL_INTERVAL_MS
时间,一定要大于(单记录处理时间*MAX_POLL_RECORDS
),否则会导致rebalance, consumer commit失败。
offset提交方式
默认配置(spring.kafka.enable-auto-commit: false
)时,consumer offset的提交操作交由spring管理,spring会在下次poll之前提交,即本次poll的所有记录处理完以后才会进行批量提交,具体可参考 KafkaMessageListenerContainer#run()
方法
-
enable.auto.commit: true
enable.auto.commit 的默认值是 true;采用自动提交的机制。此时会根据 auto.commit.interval.ms 配置的时间间隔去自动 commit, 就算 record 被消费异常也会自动 commit.
auto.commit.interval.ms 的默认值是 5000,单位是毫秒。
这样,默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka,或者每一次数据从指定的 Topic 取回时,将会提交最后一次的 Offset。
如果 enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑了。 -
enable.auto.commit: false
当 enable-auto-commit 为 false 时, 需要根据 listener 的 ack-mode 来确定确认模式. -
ack-mode
record
在 listener 处理每条消息之后提交, 即处理一条提交一条.batch
默认值 ,在下一次 poll 之前提交已经处理完的记录.time
按照时间间隔来提交, 单位为毫秒.count
累积到 count 数目时提交.count_time
到了时间或者到了累积的数目时提交.manual
手工提交, 需要在业务代码中调用 Acknowledgment.acknowledge() 提交, 调用之后,就是跟 batch 同理处理。manual_immediate
调用 Acknowledgment.acknowledge() 之后立马提交.
网友评论