美文网首页
spring-kafka的线程模型与spring.kafka.l

spring-kafka的线程模型与spring.kafka.l

作者: 司青玄 | 来源:发表于2019-04-23 17:51 被阅读0次

    在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方便的一层封装spring-kafka。不过,它可不是简单的封装了kafka-client, 这里面有很多需要注意的问题,比如下面这个参数:

    spring.kafka.listener.concurrency=3
    

    它并不像参数名那样简单,背后挺复杂的。如果你用jstack把线程dump出来,会发现spring-kafka启动了好多线程,然后一脸懵逼。下面就主要介绍一下这些线程都是干什么的。

    在spring-kafka在运行时会启动两类线程,一类是Consumer线程,另一类是Listener线程。前者用来直接调用kafka-client的poll()方法获取消息,后者才是调用我们代码中标有@KafkaListener注解方法的线程。如果直接使用kafka-client的话,那么正常的写法是一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果你想用多个Consumer, 除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。

    对于spring.kafka.listener.concurrency=3这个参数来说,它设置的是每个@KafkaListener的并发个数。每添加一个@KafkaListener, spring-kafka都会启动concurrency条Consumer线程来监听这些topic(注解可以指定监听多个topic), 当enable-auto-commit设为true时会直接在当前线程,即kafka consumer所在线程调用我们的@KafkaListener方法,如果设为false,则是将消息投放到阻塞队列中,另一边由Listener线程取出执行,有源码为证:

    // if the container is set to auto-commit, then execute in the
                            // same thread
                            // otherwise send to the buffering queue
                            if (this.autoCommit) {
                                invokeListener(records);
                            }
                            else {
                                if (sendToListener(records)) {
                                    if (this.assignedPartitions != null) {
                                        // avoid group management rebalance due to a slow
                                        // consumer
                                        this.consumer.pause(this.assignedPartitions);
                                        this.paused = true;
                                        this.unsent = records;
                                    }
                                }
                            }
    

    所以,当concurrency=3,自动提交设置为false时,如果你程序里有两个方法标记了@KafkaListener,那么此时会启动 2 * 3 = 6 个Consumer线程,6个Listener线程。
    这个信息在排查错误的时候非常重要,但官方文档居然没怎么提线程的事(不够详细),只是在介绍KafkaContainerListener。特此记录

    相关文章

      网友评论

          本文标题:spring-kafka的线程模型与spring.kafka.l

          本文链接:https://www.haomeiwen.com/subject/jgcbgqtx.html