美文网首页
Consumer消费原理(近万字剖析)

Consumer消费原理(近万字剖析)

作者: CoderInsight | 来源:发表于2023-02-27 08:58 被阅读0次

    (1). offset

    (1.1) offset介绍

    • kafka当中的offset:消息的偏移量是有两种含义的:
      • broker当中最新的消息的offset的值(对于这个值的保存是由Kafka集群自己去维护的,不用我们管。)
      • 消费者消费到了哪一条数据的offset的值
    • Offset的保存位置和方式
      • 老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。
      • 新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件。
    ps: __consumer_offsets的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。
    

    (1,2), Offset管理

        每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。所以后来就是提交offset发送给内部topic:__consumer_offsets,提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。而且因为这个 __consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多。
    

    (1.3) __consumer_offsets 的消息格式(K-V)

    这个topic下的消息格式为k-v结构,k-v分别代表消息的键值和消息体。
    k-(Group ID,主题名,分区号)
    v-(位移值,ts,metadata)
    

    __consumer_offset何时被创建?当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题.
    如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3

    (1.4) 提交 offset

    Consumer 端有个位移的概念,它和消息在分区中的位移不是一回事儿,虽然它们的英文都是 Offset。Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。切记是下一条消息的位移,而不是目前最新消费消息的位移。

    假设一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。
    

    Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据

    PS:提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。
    

    位移提交的语义保障是由你来负责的,Kafka 只会“无脑”地接受你提交的位移

    从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交

    所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移,作为用户的你完全不必操心这些事;而手动提交,则是指你要自己提交位移,Kafka Consumer 压根不管.
    
    consumerOffset.jpeg
    1.4.1) 自动提交
    • 开启自动提交位移的方法很简单:

      • Consumer 端有个参数 enable.auto.commit,把它设置为 true 或者压根不设置它就可以了。因为它的默认值就是 true,即 Java Consumer 默认就是自动提交位移的。
      • 如果启用了自动提交,Consumer 端还有个参数就派上用场了:auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。
    • 自动提交的优缺点:

      • 自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。
      • 但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理,同时也会导致数据丢失。
    Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "2000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("foo", "bar"));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         }
    
    1.4.2) 手动提交

    和自动提交相反的,就是手动提交了。事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移; 开启手动提交位移的方法就是设置 enable.auto.commit 为 false。但是,仅仅设置它为 false 还不够,因为只是告诉 Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。

    最简单的 API 就是KafkaConsumer.commitSync()。该方法会提交 KafkaConsumer.poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。

    • 下面这段代码展示了 commitSync() 的使用方法:
     while (true) {
                // 每隔1000ms拉取数据
                ConsumerRecords<String, String> records = consumer.poll(1000);
              //  process(records); // 处理消息
                try {
                    // 处理完成之后,同步提交offset的值
                    consumer.commitSync();
                } catch (CommitFailedException e) {
                  //  handle(e); // 处理提交失败异常
                }
     }
    
    1.4.3) 自动提交与手动提交的区别
    • 一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。
    • 在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。
    • 反观手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。
    • 鉴于这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer.commitAsync()。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。
    • 下面这段代码展示了调用 commitAsync() 的方法:
    while (true) {
           ConsumerRecords<String, String> records = consumer.poll(5);
           //   process(records); // 处理消息
           consumer.commitAsync((offsets, exception) -> {
               if (exception != null){
                  //  handle(exception);
               }
            });
      }
    
    1.4.4) 同步提交与异步提交的选择

    CommitAsync 是否能够替代 CommitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

    显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:

    1. 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
    2. 我们不希望程序总处于阻塞状态,影响 TPS (Transactions Per Second -- 每秒传输的事物处理个数,即服务器每秒处理的事务数)。
    • 来看一下下面这段代码,它展示的是如何将两个 API 方法结合使用进行手动提交。
    // 使用同步提交结合异步提交的方式没,实现了offset一定要提交,也提高了消费的速度
    // 注意多次提交 offset不会产生问题 ==> offset是保存到一个默认的topic中了  __consumer_offset
    try {
                //这里是先写了一个死循环,只有当消息提交的过程中出现异常才会跳出,然后最后无论如何都会执行finnaly中的方法
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(5);
                  //  process(records); // 处理消息
                    consumer.commitAsync();// 使用异步提交规避阻塞
                }
            } catch (Exception e) {
               // handle(e); // 处理异常
            } finally {
                try {
                    consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
                } finally {
                    consumer.close();
                }
            }
    

    这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,所以,如果你需要自行编写代码开发一套 Kafka Consumer 应用,那么我推荐你使用上面的代码范例来实现手动的位移提交。

    1.4.5) 更精细的提交offset

    Kafka Consumer API 为手动提交提供了这样的方法:commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)。它们的参数是一个 Map 对象,键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。

    比如,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例,展示一段代码,实际上,commitSync 的调用方法和它是一模一样的。

        // 提前定义一个hashmap
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    
            int count = 0;
            while (true) {
                // poll(1000)中指定的值是设置拉取数据的超时时间,如果隔1秒拉取不到数据认为超时,进行下一轮的数据的拉取
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record: records) {
                //  process(record);  // 处理消息
                // 确定哪一个主题、哪一个分区、从下一个offset开始消费数据
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                    if(count % 100 == 0)
                    // 每个分区的offset的值,保存到一个map集合里面,通过提交map的集合中的数据来是实现每个分区的数据的保存
                    consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
                    count++;
                }
            }
    
    1.4.6) 删除过期消息

    kafka当中的消息默认是保存168小时(7天),当然也可以进行调整

    Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。
    对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。
    
    删除过期消息.jpeg
    Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。
    很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
    

    (2), Coordinator 协调器

    • Coordinator的作用

        每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance.
        根据内部的一个选择机制,会挑选一个对应的Broker,Kafka总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的.
        consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。
      
    • 如何选择哪台是coordinator??

        首先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。
        比如说:groupId,"membership-consumer-group" -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的一个分区,consumer_offset的分区的副本数量默认来说1,只有一个leader,然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。
      
    GroupCoordinator原理剖析.png

    (3),消费者组 consumer Group

    Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

    消费者组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

    1、Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
    2、Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
    3、Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
    

    Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型(P2P/PubSub):如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。

    在实际使用场景中,怎么知道一个 Group 下该有多少个 Consumer 实例呢?理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。

    举个简单的例子,假设一个 Consumer Group 订阅了 3 个主题,分别是 A、B、C,它们的分区数依次是 1、2、3,那么通常情况下,为该 Group 设置 6 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性。
    

    针对 Consumer Group,Kafka 是怎么管理位移的呢?

    老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
    
    不过,慢慢地人们发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。
    
    在新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。即:__consumer_offsets。
    

    (4), consumer Group Rebalance

    1) rebalance介绍

    Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

    那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。
    
    1、组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
    
    2、订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
    
    3、订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
    

    举个简单的例子来说明一下 Consumer Group 发生 Rebalance 的过程。假设目前某个 Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:

    consumer-rebalance.png

    Rebalance 之后的分配依然是公平的,即每个 Consumer 实例都获得了 3 个分区的消费权。这是我们希望出现的情形。

    2) rebalance的问题

    1、Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW 期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

    2、目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

    3、Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。

    (5). consumer Group Rebalance 高级

    1). Consumer Group Coordinator

    Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。但是,在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。

    所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。
    
    Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
    
    所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。
    

    2). kafka 如何为 Consumer Group确定Coordinator

    1、确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
    
    2、找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
    
    ps:首先,Kafka 会计算该 Group 的 group.id 参数的哈希值。比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是 627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。有了分区号,算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个 Broker 上就可以了。这个 Broker,就是我们要找的 Coordinator。
    

    3). rebalance的弊端

    1、Rebalance 影响 Consumer 端 TPS。这个之前也反复提到了,这里就不再具体讲了。总之就是,在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了。
    
    2、Rebalance 很慢。如果你的 Group 下成员很多,就一定会有这样的痛点。
    
    3、Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。
    

    那么针对这些问题,kafka是否可以解决?---- 无解。

    所以需要尽量的避免rebalance。

    发生rebalance的情况如下:
    1、组成员数量发生变化
    2、订阅主题数量发生变化
    3、订阅主题的分区数发生变化
    
    后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。那么如何避免组成员数量发生变化?
    

    4). 如何避免组成员数量发生变化

    如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance。这是 Rebalance 发生的最常见的原因。

    4.1) 增加实例

    通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。总之,它不属于我们要规避的那类“不必要 Rebalance”。

    4.2) 减少实例

    如果就是要停掉某些 Consumer 实例,那自不必说,关键是在某些情况下,Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的 Rebalance,就需要处理了。

    Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要退组呢?
    
    当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timout.ms 决定了 Consumer 存活性的时间间隔。
    
    除了这个参数,Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。
    
    除了以上两个参数,Consumer 端还有一个参数,用于控制Consumer实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。
    
    

    5). 解决非必要的rebalance

    第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的

    需要仔细地设置session.timeout.ms 和 heartbeat.interval.ms的值。
    
    设置 session.timeout.ms = 6s。
    设置 heartbeat.interval.ms = 2s。
    要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
    
    将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。
    

    第二类非必要 Rebalance 是 Consumer 消费时间过长导致的

    合理的设置max.poll.interval.ms,时间尽量设置为比下游处理完成时间略长一点
    

    如果上述两种配置完成后,还会进行rebalance,那么就需要去设置GC了。

    相关文章

      网友评论

          本文标题:Consumer消费原理(近万字剖析)

          本文链接:https://www.haomeiwen.com/subject/sfkqkdtx.html