在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方便的一层封装spring-kafka
。不过,它可不是简单的封装了kafka-client, 这里面有很多需要注意的问题,比如下面这个参数:
spring.kafka.listener.concurrency=3
它并不像参数名那样简单,背后挺复杂的。如果你用jstack把线程dump出来,会发现spring-kafka启动了好多线程,然后一脸懵逼。下面就主要介绍一下这些线程都是干什么的。
在spring-kafka在运行时会启动两类线程,一类是Consumer线程,另一类是Listener线程。前者用来直接调用kafka-client的poll()方法获取消息,后者才是调用我们代码中标有@KafkaListener
注解方法的线程。如果直接使用kafka-client的话,那么正常的写法是一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果你想用多个Consumer, 除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。
对于spring.kafka.listener.concurrency=3
这个参数来说,它设置的是每个@KafkaListener
的并发个数。每添加一个@KafkaListener
, spring-kafka都会启动concurrency
条Consumer线程来监听这些topic(注解可以指定监听多个topic), 当enable-auto-commit
设为true
时会直接在当前线程,即kafka consumer所在线程调用我们的@KafkaListener
方法,如果设为false
,则是将消息投放到阻塞队列中,另一边由Listener线程取出执行,有源码为证:
// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
// avoid group management rebalance due to a slow
// consumer
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.unsent = records;
}
}
}
所以,当concurrency=3,自动提交设置为false时,如果你程序里有两个方法标记了@KafkaListener
,那么此时会启动 2 * 3 = 6 个Consumer线程,6个Listener线程。
这个信息在排查错误的时候非常重要,但官方文档居然没怎么提线程的事(不够详细),只是在介绍KafkaContainerListener
。特此记录
网友评论