美文网首页
spring-kafka的线程模型与spring.kafka.l

spring-kafka的线程模型与spring.kafka.l

作者: 司青玄 | 来源:发表于2019-04-23 17:51 被阅读0次

在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方便的一层封装spring-kafka。不过,它可不是简单的封装了kafka-client, 这里面有很多需要注意的问题,比如下面这个参数:

spring.kafka.listener.concurrency=3

它并不像参数名那样简单,背后挺复杂的。如果你用jstack把线程dump出来,会发现spring-kafka启动了好多线程,然后一脸懵逼。下面就主要介绍一下这些线程都是干什么的。

在spring-kafka在运行时会启动两类线程,一类是Consumer线程,另一类是Listener线程。前者用来直接调用kafka-client的poll()方法获取消息,后者才是调用我们代码中标有@KafkaListener注解方法的线程。如果直接使用kafka-client的话,那么正常的写法是一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果你想用多个Consumer, 除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。

对于spring.kafka.listener.concurrency=3这个参数来说,它设置的是每个@KafkaListener的并发个数。每添加一个@KafkaListener, spring-kafka都会启动concurrency条Consumer线程来监听这些topic(注解可以指定监听多个topic), 当enable-auto-commit设为true时会直接在当前线程,即kafka consumer所在线程调用我们的@KafkaListener方法,如果设为false,则是将消息投放到阻塞队列中,另一边由Listener线程取出执行,有源码为证:

// if the container is set to auto-commit, then execute in the
                        // same thread
                        // otherwise send to the buffering queue
                        if (this.autoCommit) {
                            invokeListener(records);
                        }
                        else {
                            if (sendToListener(records)) {
                                if (this.assignedPartitions != null) {
                                    // avoid group management rebalance due to a slow
                                    // consumer
                                    this.consumer.pause(this.assignedPartitions);
                                    this.paused = true;
                                    this.unsent = records;
                                }
                            }
                        }

所以,当concurrency=3,自动提交设置为false时,如果你程序里有两个方法标记了@KafkaListener,那么此时会启动 2 * 3 = 6 个Consumer线程,6个Listener线程。
这个信息在排查错误的时候非常重要,但官方文档居然没怎么提线程的事(不够详细),只是在介绍KafkaContainerListener。特此记录

相关文章

  • spring-kafka的线程模型与spring.kafka.l

    在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方...

  • SQLite 线程安全和并发

    SQLite 与线程 SQLite是线程安全的。 线程模型 SQLite支持如下三种线程模型 单线程模型这种模型下...

  • 线程池

    线程是调度CPU资源的最小单位,线程模型分为KLT模型和ULT模型,Java采用的是KLT模型,java线程与OS...

  • 线程模型的3种实现

    线程的实现模型主要有3种:内核级线程模型、用户级线程模型和混合型线程模型。它们之间最大的区别在于线程与内核调度实体...

  • EventLoopGroup的分析

    EventLoopGroup与Reactor 浅谈 Reactor 线程模型 单线程模型 问题:当handler1...

  • 七周七并发模型

    七个模型来介绍并发与并行。 线程与锁:线程与锁模型有很多众所周知的不足,但仍是其他模型的技术基础,也是很多并 发软...

  • 高效并发

    从JVM的角度看一下Java与线程,内存模型,线程安全以及JVM对于锁的优化 硬件内存模型与JVM内存模型 硬件的...

  • Netty线程模型及EventLoop详解

    作者: 一字马胡 转载标志 【2017-11-03】 更新日志 线程模型与并发 什么是线程模型呢?线程模型指定了...

  • Reactor线程模型及其在Netty中的应用

    什么是Reactor线程模型 Java中线程模型大致可以分为: 单线程模型 多线程模型 线程池模型(executo...

  • Android下多线程的实现

    Android下多线程相关 线程安全相关问题参考:java内存模型与线程 android下与多线程有关的主要有以下...

网友评论

      本文标题:spring-kafka的线程模型与spring.kafka.l

      本文链接:https://www.haomeiwen.com/subject/jgcbgqtx.html