美文网首页
Spring for Apache Kafka的理解

Spring for Apache Kafka的理解

作者: 小驴小驴 | 来源:发表于2021-08-21 21:41 被阅读0次
    -1.header.jpeg

    目录

    • 一、前言
    • 二、消费者偏移量提交
      • 2.1 前言
      • 2.2 KafkaClient中消费者提交偏移量
      • 2.3 Spring for kafka Offset Commit
    • 三、@KafkaListener标注的方法签名
    • 四、MessageListenerContainer的两种方式
      • 4.1 KafkaMessageListenerContainer VS ConcurrentMessageListenerContainer
      • 4.2 ConcurrentMessageListenerContainer 中 Concurrency
      • 4.3 使用ConcurrentMessageListenerContainer注意点
        • 4.3.1 不得不知道的消费者组分区分配原则
        • 4.3.2 Spring for Apache Kafka到底适合选择何种消费者组分区方案
        • 4.3.3 实验
          • 4.3.3.1 实验一
          • 4.3.3.2 实验二
          • 4.3.3.3 实验三
          • 4.3.3.4 结论
    • 五、参考

    一、前言

    Kafka的使用场景不言而喻,可以说在各大公司,大多数都会用到Kafka,Kafka有自带的客户端即KafkaClient,但由于其编程的复杂性,因此大多数项目中都会采用SpringData项目下的Spring for Apache Kafka来简化对Kafka应用的开发。

    可想而知的Spring for Apache Kafka也是基于KafkaClient,只是在外层做了很多的包装与优化,使得开发人员更易、更快上手Kafka应用。因此了解Spring for Apache Kafka做了哪些包装,其与原生的KafkaClient有了哪些不同的特性,是我们开发人员需要了解的。

    为了研究下面的课题,这里先创建一个名为 spring-kafka-test Topic,分区数为5,副本数为1(因为是单机Kafka环境)。

    #创建Topic命令
    ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic spring-kafka-test --partitions 5 --replication-factor 1
    #检查一下Topic描述
    ./kafka-topics.sh --bootstrap-server localhost:9092 --topic spring-kafka-test --describe
    #描述信息如下即可:
    Topic: spring-kafka-test    PartitionCount: 5   ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: spring-kafka-test    Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: spring-kafka-test    Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: spring-kafka-test    Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: spring-kafka-test    Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: spring-kafka-test    Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001
    

    二、消费者偏移量提交

    2.1 前言

    因为当前Kafka版本为2.7.x,是写这篇博客时最新的Kafka版本,首先要知道的前提是在Kafka 0.9版本之后,消费者的偏移量信息已经从Zookeeper中收到到Kafka一个特殊的Topic consumer_offset 中去管理,这一点原因官网上有介绍,不是本博客讨论的重点,这里不做解释。

    2.2 KafkaClient中消费者提交偏移量

    在原生的KafkaClient客户端中,偏移量提交的控制比较少,无非就是分为两种,如图:

    0.KafkaClient偏移量提交方式.png

    2.3 Spring for kafka Offset Commit

    Spring for Apache Kafka的消费者Offset提交方式多达七种,如果enable.auto.commit.consumer属性为true,则Kafka会根据配置自动提交,下面的七种方式也都无效。如果属性设置为false,则交由Spring来控制Offset的提交(具体的提交方式如下七种),默认的AckMode为BATCH。在Frameworkd2.3版本开始enable.auto.commit.consumer被设置为默认false。除非显式指定。

    • RECORD

      当消费方式为Single,@KafkaListener标注的方法中每处理一条消息(在处理结束后)自动提交一次偏移量。

      需要主注意的是,在消费方式为BATCH批量消费的模式下,如果Spring for Kafka使用了RECORD的Offset提交模式,则项目启动会直接抛出异常:

      Caused by: java.lang.IllegalStateException: Cannot use AckMode.RECORD with a batch listener
      
    • BATCH

      当消费方式为BATCH批量消费模式下(需要注意两个BATCH的区别),@KafkaListener标注的方法中每处理一批消息(在处理结束后)自动提交一次偏移量。

      与上述的RECORD提交模式不同,假设SINGLE非批量消费的模式下,也就是一次只读取一条消息,在这样的背景下,依然可以选择BATCH的Offset提交模式,并不会报错。

    • TIME

      每隔一段时间进行偏移量提交,时间由ackTime指定

    • COUNT

      每隔处理的消息条数进行偏移量提交,消息数有ackCount指定

    • COUNT_TIME

      TIMECOUNT_TIME的综合体,哪个条件先满足就立即提交偏移量

    • MANUAL

      处理完消息后,手动调用Acknowledgment.acknowledge()之后,其表现将与上述的BATCH行为一致

    • MANUAL_IMMEDIATE

      处理完消息后,手动调用Acknowledgment.acknowledge()之后,将会立即提交偏移量

    三、@KafkaListener标注的方法签名

    在@KafkaListener标注的方法中,方法的签名有很多种,一会儿是List<Record>一会又是Record,有的方法可以有Acknowledgment,而有的方法签名中有这个参数就会抛出异常,这一小节,就是研究一下@KafkaListener标注的方法签名到底有多少种?

    在如下的接口中,定义着可以使用的方法签名,当然,你可能会问,这些签名是不是可以随便使用?傻瓜,当然不是,这也是要看你的相关配置,根据不同的配置Spring for kafka在内部会初始化不同的Message Listeners,当消息来的时候,内部会通过反射方式调用@KafkaListener标注的方法,因此方法签名绝对不是随便定义的。你比如说:你的消费方式为BATCH批量消费,那在方法签名中肯定是要使用List<Record>而非Record啦。

    // 1
    public interface MessageListener<K, V> { 
        void onMessage(ConsumerRecord<K, V> data);
    }
    // 2
    public interface AcknowledgingMessageListener<K, V> { 
        void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
    }
    // 3
    public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
        void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
    }
    // 4
    public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
        void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
    }
    // 5
    public interface BatchMessageListener<K, V> { 
        void onMessage(List<ConsumerRecord<K, V>> data);
    }
    // 6
    public interface BatchAcknowledgingMessageListener<K, V> { 
        void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
    }
    // 7
    public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
        void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
    }
    // 8
    public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
        void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
    }
    

    ① 当使用SINGLE方式且自动提交或容器管理的提交方法之一时,使用这个接口来处理从Kafka消费者poll()操作接收到的单个ConsumerRecord实例。

    ② 当使用SINGLE方式且手动提交方法时,使用这个接口来处理从Kafka消费者poll()操作接收到的单个ConsumerRecord实例。

    ③ 当使用SINGLE方式且自动提交或容器管理的提交方法之一时,使用这个接口来处理从Kafka消费者poll()操作接收到的单个ConsumerRecord实例,并提供了Consumer的访问。

    ④ 当使用SINGLE方式且手动提交方法时,使用这个接口来处理从Kafka消费者poll()操作接收到的单个ConsumerRecord实例,并提供了Consumer的访问。

    ⑤ 当使用自动提交或容器管理的提交方法之一时,使用这个接口处理从Kafka消费者poll()操作接收到的所有ConsumerRecord实例。AckMode。使用此接口时不支持RECORD,因为给侦听器的是完整的批处理。

    ⑥ 当使用BATCH方式且手动提交方法时,使用这个接口来处理从Kafka消费者poll()操作接收到的单个ConsumerRecord实例。

    ⑦ 当使用BATCH方式且自动提交或容器管理的提交方法之一时,使用这个接口处理从Kafka消费者poll()操作接收到的所有ConsumerRecord实例。AckMode。使用此接口时不支持RECORD,因为给侦听器的是完整的批处理,提供了对Consumer对象的访问。

    ⑧ 当使用BATCH方式且手动提交方法时,使用这个接口来处理从Kafka消费者poll()操作接收到的所有的ConsumerRecord实例,提供了对Consumer对象的访问。

    四、MessageListenerContainer的两种方式

    Spring for Apache Kafka提供了两个消息监听容器实现:KafkaMessagelistenerContainerConcurrentMessageListenerContainer

    4.1 KafkaMessageListenerContainer VS ConcurrentMessageListenerContainer

    • KafkaMessageListenerContainer

      该消息监听器是在一个线程中从所有的主题中或分区中监听所有的消息

    • ConcurrentMessageListenerContainer

      其本质上就是代理了一个或多个(主要看concurrency的配置)KafkaMessageListenerContainer从而来实现多线程的监听。注:其实对于这里的多线程而言,就可以看成是一个消费者组中有多少个消费者

      下面看一下ConcurrentMessageListenerContainer类:

      public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
          private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
          private int concurrency = 1;
          
          public void setConcurrency(int concurrency) {
            Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
            this.concurrency = concurrency;
        }
          /*
           * Under lifecycle lock.
           */
          @Override
          protected void doStart() {
              if (!isRunning()) {
                  // 省略
                  for (int i = 0; i < this.concurrency; i++) {
                      KafkaMessageListenerContainer<K, V> container =
                              constructContainer(containerProperties, topicPartitions, i);
                      configureChildContainer(i, container);
                      if (isPaused()) {
                          container.pause();
                      }
                      container.start();
                   // 重要
                      this.containers.add(container);
                  }
              }
          }
      }
      

      可以看到,ConcurrentMessageListenerContainer其实内部维护了一个装有KafkaMessageContainer的集合来实现多线程,而KafkaMessageContainer的个数由配置的concurrency决定。

    4.2 ConcurrentMessageListenerContainer 中 Concurrency

    4.1节中详细说明了ConcurrentMessageListenerContainer KafkaMessageContainer的关系,而对于Concurrency的配置,其决定了消费者组中有多少个线程或者叫消费者。

    4.3 使用ConcurrentMessageListenerContainer注意点

    在实际项目中,我们肯定都是使用ConcurrentMessageListenerCOntainer以减轻消费者组中只有单消费者的压力。那么在使用ConcurrentMessageListenerContainer过程中有什么注意点呢?

    4.3.1 不得不知道的消费者组分区分配原则

    在Kafka中有两种常用的消费者组分区分配原则,分别是RoundRobinAssignorRangeAssignor

    • RoundRobinAssignor

      将消费者组订阅的多个Topic的分区号进行排序,再将消费者组中的消费者进行排序,然后依次分配,如下图:

      RoundRobin方式,比较简单,就是将消费者组订阅的所有Topic的Partition进行排序,并且将消费者组中的消费者进行按照名称Hash排序,然后依次将排好序的Partition进行分发即可。


      1.Round.png
    • RangeAssignor

      Range方式是Kafka默认采取的消费者组分区分配方案。

      该分配方式是以每个Topic为基准的分配方式,将每个Topic中的Partition单独进行排序,然后用当前Topic的总数除以消费者数,如果除不尽,则前面几个消费者会多分担一些。以上述步骤去继续分配完所有的Topic;

      因此这种情况可能会导致消费者组中有空闲的消费者。


      2.Range.png

    4.3.2 Spring for Apache Kafka到底适合选择何种消费者组分区方案

    在了解了4.3.1小结,知道了Kafka较为常用的两种分区分配方案,那么哪一种更适合Spring for Kafka的ConcurrentMessageListenerContainer呢?

    引用Spring for Apache Kafka的原话如下:

    When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. To change the PartitionAssignor, you can set the partition.assignment.strategy consumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) in the properties provided to the DefaultKafkaConsumerFactory.
    
    When using Spring Boot, you can assign set the strategy as follows:
    spring.kafka.consumer.properties.partition.assignment.strategy=\
    org.apache.kafka.clients.consumer.RoundRobinAssignor
    

    大概意思就是在ConcurrentMessageListenerContainer中,使用默认的Range分区,可能会造成消费者线程浪费。

    4.3.3 实验

    看了上述的两个小节,发现若一个消费者组同时消费多个Topic时,在Range分区分配模式下,会产生浪费消费者线程的情况,这里就对这句话进行验证。

    前提:consumer-group_1消费者数为10的消费者组,topic-atopic-b分别分区数为5的Topic

    4.3.3.1 实验一

    Range分区分配方式,消费的两个Topic在同一个@KafkaListener注解中

    @KafkaListener(topics = {"topic-a", "topic-b"}, groupId = "consumer-group_1")
    public void topic_range_test(List<ConsumerRecord<?, ?>> record) {
        // xxx
    }
    

    看一下consumer-group_1的详情,主要看一下CONSUMER-ID,会发现,虽然consumer-group_1的消费者数配置为10,但是在实际中,只有5个消费者被使用了,还有5个消费者属于IDLE状态,这也完全符合Range分区分配的原则。

    3.Range_1.png
    4.3.3.2 实验二

    Range分区分配方式,消费的两个Topic写在不同的@KafkaListener注解中

    @KafkaListener(topics = {"topic-a"}, groupId = "consumer-group_1")
    public void topic_range_test_1(List<ConsumerRecord<?, ?>> record) {
        // xxx
    }
    @KafkaListener(topics = {"topic-b"}, groupId = "consumer-group_1")
    public void topic_range_test_2(List<ConsumerRecord<?, ?>> record) {
        // xxx
    }
    

    看一下consumer-group_1的详情,主要看一下CONSUMER-ID,会发现,当Topic写在不同的@KafkaListener中时,不会出现一个消费者同时消费多个Partition的情况。但这样的情况下,会浪费了10个消费者线程,因为第一个方法中10个线程只用了5个,因此浪费了5个消费者线程,同样第二个方法也是,这样看来一共浪费了10个消费者线程。

    4.Range_2.png
    4.3.3.3 实验三

    使用RoundRobin方式,再去测试实验一代码

    看一下consumer-group_1的详情,发现没有出现同一个消费者消费多个Partition*

    5.Round.png
    4.3.3.4 结论
    • ConcurrentMessageListenerContainer中,应该使用RoundRobin,从而避免了Range会造成消费者线程的浪费

    • 在不同的@KafkaListener都会创建不同的ConcurrentMessageListenerContainer对象,而不同的ConcurrentMessageListenerContainer都会有不同的KafkaMessageListenerContainer,而不同的KafkaMessageListenerContainer都会有不同的ConcurrencyThread。因此假设有N个@KafkaListener,那么就有N * Concurrency个相互隔离的Thread

    五、参考

    https://docs.spring.io/spring-kafka/docs/current/reference/html/

    相关文章

      网友评论

          本文标题:Spring for Apache Kafka的理解

          本文链接:https://www.haomeiwen.com/subject/zsorbltx.html