美文网首页Kafka相关
kafka consumer rebalance

kafka consumer rebalance

作者: tracy_668 | 来源:发表于2021-08-02 23:12 被阅读0次

    [TOC]

    写在前面

    前阵子遇到的rebalance的问题比较多,发现自己对rebalance的理解也仅仅是浮于表面,也就是网上和书上讲的宏观层面上的什么FIND_COORDINATOR -> JOIN -> SYNC -> STABLE这一套,但是服务端究竟是如何运转的呢?rebalance为什么会突然劣化导致停顿几分钟呢?依旧不甚了解,网上能查到的信息也都寥寥无几基本上也都是宏观上的几个阶段流转,所以决定自己重新去看一下这部分的源码,并记录下来。

    rebalance本身是一个比较复杂的过程,各位要是对源码分析看不进去,我会在源码分析之前给出一个过程概述,可能会更好理解一点。源码这个东西,在博客上看,真是看来就忘,还是要自己打开IDE导入源码自己分析才能加深印象,当自己无法理解的时候,可以到博客里面搜一下相关的注释,看下别人的思路和理解,这是我个人认为看源码比较友好的方式。

    二、什么是rebalance?

    中文直译,就是重平衡。
    是什么去重平衡呢?消费组内的消费者成员去重平衡。

    为什么需要重平衡呢?因为消费组内成员的故障转移和动态分区分配。

    消费组内成员的故障转移:当一个消费组内有三个消费者A,B,C,分别消费分区:a,b,c

    A -> a
    B -> b
    C -> c
    

    此时如果A消费者出了点问题,那么就意味着a分区没有消费者进行消费了,那这肯定不行,那么就通过rebalance去将a分区分配给其他还存活着的消费者客户端,rebalance后可能得到的消费策略:

    A -> a (GG)
    B -> b,a
    C -> c
    

    这就是消费组内成员的故障转移,就是某个消费者客户端出问题之后把它原本消费的分区通过REBALNACE分配给其他存活的消费者客户端。

    动态分区分配:当某个topic的分区数变化,对于消费组而言可消费的分区数变化了,因此就需要rebalance去重新进行动态分区分配,举个栗子,原本某topic只有3个分区,我现在扩成了10个分区,那么不就意味着多了7个分区没有消费者消费吗?这显然是不行的,因此就需要rebalance过程去进行分区分配,让现有的消费者去把这10个分区全部消费到。

    三、rebalance是怎么触发的?

    这个其实在上面一小节已经提到的差不多了,在这个小节再做一点补充和总结。
    触发条件:

    • 消费组内成员变化:下线/上线/故障被踢出。
    • 消费的分区数变化:topic被删了,topic分区数增加了。
    • coordinator节点出问题了:因为消费组的元数据信息都是在coordinator节点的,因此coordinator节点出问题也会触发rebalance去找一个新的coordinator节点。怎么找呢?显然就是走一遍FIND_COORDINATOR请求嘛,然后找到负载最低的那个节点问一下,我的新的coordinator在哪儿呀?然后得到答案之后让消费者客户端去连新的coordinator节点。

    四、rebalance的宏观过程

    整个rebalance的过程,是一个状态机流转的过程,整体过程示意图如下:图源:https://www.cnblogs.com/huxi2b/p/6815797.html

    image.png

    其实上面这个状态机流转过程在明白原理的情况下,已经非常清晰了,但是如果没看过源码的,依旧不知道为什么是这么流转的,什么情况下状态是Empty呢,什么状态下是Stable呢?什么时候Empty状态会转换为PreparingRebalance状态呢?

    下面我就根据请求顺序来看下整个状态的流转过程: image.png

    需要说明的一点是,上面请求的状态CompletingRebalance其实就对应上面的AwaitingSync状态。

    让我们根据这个请求顺序图来解释一下各个状态是如何流转的:
    Empty(Empty):当一个Group是新创建的,或者内部没有成员时,状态就是Empty。我们假设有一个新的消费组,这个消费组的第一个成员发送FIND_COORDINATOR请求的时候,也就是开启了Rebalacne的第一个阶段。

    PreparingRebalance(JOIN):当完成FIND_COORDINATOR请求后,对应的客户端就能找到自己的coordinator节点是哪个,然后紧接着就会发送JOIN_GROUP请求,当coordinator收到这个请求后,就会把状态由Empty变更为PreparingRebalance,意味着准备要开始rebalance了

    CompletingRebalance(SYNC):当所有的成员都完成JOIN_GROUP请求的发送之后,或者rebalance过程超时后,对应的PreparingRebalance阶段就会结束,进而进入CompletingRebalance状态。

    Stabe(Stable):在进入CompletingRebalance状态的时候呢,服务端会返回所有JOIN_GROUP请求对应的响应,然后客户端收到响应之后立刻就发送SYNC_GROUP请求,服务端在收到leader发送的SNYC_GROUP请求后,就会转换为Stable状态,意味着整个rebalance过程已经结束了。

    上面整个过程,就是我们经常能在一些博客里面看到,其实里面有很多细节,例如这些请求都带有哪些关键数据,到底是哪个阶段导致rebalance过程会劣化到几分钟?为什么要分为这么多阶段?

    让我们带着问题继续往下把,这些状态机流转的名字太长了,后面我会用上文中括号内的简写代表对应的阶段。

    五、rebalance的微观过程概览

    image.png

    让我们来回答上个小节后面提出的几个比较细节的问题:

    这些请求都带有哪些关键数据?
    在FIND_COORDINATOR请求的时候,会带上自己的group.id值,这个值是用来计算它的coordinator到底在哪儿的,对应的计算方法就是:coordinatorId=groupId.hash % 50 这个算出来是个数字,代表着具体的分区,哪个topic的分区呢?显然是__consumer_offsets了。

    在JOIN_GROUP请求的时候,是没带什么关键参数的,但是在响应的时候会挑选一个客户端作为leader,然后在响应中告诉它被选为了leader并且把消费组元数据信息发给它,然后让该客户端去进行分区分配。
    在SYNC_GROUP请求的时候,leader就会带上它根据具体的策略已经分配好的分区分配方案,服务端收到后就更新到元数据里面去,然后其余的consumer客户端只要一发送SYNC请求过来就告诉它要消费哪些分区,然后让它自己去消费就ok了。

    到底是哪个阶段导致rebalance过程会劣化到几分钟?
    我图中特意将JOIN阶段标位红色,就是让这个阶段显得显眼一些,没错就是这个阶段会导致rebalance整个过程耗时劣化到几分钟。

    具体的原因就是JOIN阶段会等待原先组内存活的成员发送JOIN_GROUP请求过来,如果原先组内的成员因为业务处理一直没有发送JOIN_GROUP请求过来,服务端就会一直等待,直到超时。这个超时时间就是max.poll.interval.ms的值,默认是5分钟,因此这种情况下rebalance的耗时就会劣化到5分钟,导致所有消费者都无法进行正常消费,影响非常大。

    为什么要分为这么多阶段?
    这个主要是设计上的考虑,整个过程设计的还是非常优雅的,第一次连上的情况下需要三次请求,正常运行的consumer去进行rebalance只需要两次请求,因为它原先就知道自己的coordinator在哪儿,因此就不需要FIND_COORDINATOR请求了,除非是它的coordinator宕机了

    回答完这些问题,是不是对整个rebalance过程理解加深一些了呢?其实还有很多细节没有涉及到,例如consumer客户端什么时候会进入rebalance状态?服务端是如何等待原先消费组内的成员发送JOIN_GROUP请求的呢?这些问题就只能一步步看源码了。

    六、JOIN阶段源码分析

    从这段函数我们知道,如果加入一个新的消费组,服务端收到第一个JOIN请求的时候会创建group,这个group的初始状态为Empty

    // 如果group都还不存在,就有了memberId,则认为是非法请求,直接拒绝。
          groupManager.getGroup(groupId) match {
            case None =>
              // 这里group都还不存在的情况下,memberId自然是空的
              if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
                responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
              } else {
                // 初始状态是EMPTY
                val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
                // 执行具体的加组操作
                doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
              }
    
            case Some(group) =>
              doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
          }
    
    

    让我们进入doJoinGroup函数,看下里面的核心逻辑:

        case Empty | Stable =>
                // 初始状态是EMPTY,添加member并且执行rebalance
                if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
                  // if the member id is unknown, register the member to the group
                  addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
                } else {
                // ...
                  } else {
                  //...
                  }
    
    
    
      private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                        sessionTimeoutMs: Int,
                                        clientId: String,
                                        clientHost: String,
                                        protocolType: String,
                                        protocols: List[(String, Array[Byte])],
                                        group: GroupMetadata,
                                        callback: JoinCallback) = {
        // 根据clientID初始化memberID
        val memberId = clientId + "-" + group.generateMemberIdSuffix
        // 封装一个member对象
        val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
          sessionTimeoutMs, protocolType, protocols)
        member.awaitingJoinCallback = callback
        // update the newMemberAdded flag to indicate that the join group can be further delayed
        if (group.is(PreparingRebalance) && group.generationId == 0)
          group.newMemberAdded = true
        // 增加成员到group中
        group.add(member)
        maybePrepareRebalance(group)
        member
      }
    
    
    
      def add(member: MemberMetadata) {
        if (members.isEmpty)
          this.protocolType = Some(member.protocolType)
    
        assert(groupId == member.groupId)
        assert(this.protocolType.orNull == member.protocolType)
        assert(supportsProtocols(member.protocols))
        // coordinator选举leader很简单,就第一个发送join_group请求的那个member
        if (leaderId.isEmpty)
          leaderId = Some(member.memberId)
        members.put(member.memberId, member)
      }
    

    上面的代码翻译一下很简单,就是新来了一个member,封装一下,添加到这个group中,需要说一下的就是当组状态是Empty的情况下,谁先连上谁就是leader。紧接着就准备rebalance:

     private def maybePrepareRebalance(group: GroupMetadata) {
        group.inLock {
          if (group.canRebalance)
            prepareRebalance(group)
        }
      }
    
    
    
      // 这里是传入PreparingRebalance状态,然后获取到一个SET
      // 翻译一下:就是只有这个SET(Stable, CompletingRebalance, Empty)里面的状态,才能开启rebalance
      def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
    
      private val validPreviousStates: Map[GroupState, Set[GroupState]] =
        Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
          CompletingRebalance -> Set(PreparingRebalance),
          Stable -> Set(CompletingRebalance),
          PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
          Empty -> Set(PreparingRebalance))
    
    
    
      private def prepareRebalance(group: GroupMetadata) {
        // if any members are awaiting sync, cancel their request and have them rejoin
        if (group.is(CompletingRebalance))
          resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
    
        val delayedRebalance = if (group.is(Empty))
          new InitialDelayedJoin(this,
            joinPurgatory,
            group,
            groupConfig.groupInitialRebalanceDelayMs,// 默认3000ms,即3s
            groupConfig.groupInitialRebalanceDelayMs,
            max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
        else
          new DelayedJoin(this, group, group.rebalanceTimeoutMs)// 这里这个超时时间是客户端的poll间隔,默认5分钟
        // 状态机转换:EMPTY -> PreparingRebalance
        group.transitionTo(PreparingRebalance)
        // rebalance开始标志日志
        info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
          s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
        // 加入时间轮
        val groupKey = GroupKey(group.groupId)
        joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
      }
    
    

    上面这段代码有两个关键点,一个是判断当前能否进入rebalance过程,可以看到只有(Stable, CompletingRebalance, Empty)里面的状态,才能开启rebalance,而最开始来到第一个member的时候,组的状态是Empty显然是能进来的,但是近来之后就给转换为了PreparingRebalance状态,那么后续的member发送JOIN请求过来之后就进不来了,就只能设置个回调后一直等。

    那么要等到什么时候呢?第二段代码写的很清楚就是等待延时任务超时,这个延时任务创建是根据当前状态来判断的,如果是Empty就创建一个InitialDelayedJoin延时任务,超时时间是3s;如果不是Empty就创建一个DelayedJoin,超时时间默认是5min。看,源码出真知,这就是JOIN阶段等待member的代码实现。

    这里需要补充一下,为什么Empty的状态下要等待3s呢?这其实是一个优化,主要就是优化多消费者同时连入的情况。举个栗子,10个消费者都能在3s内启动然后练上,如果你等着3s时间那么一次rebalance过程就搞定了,如果你不等,那么就意味着来一个就又要开启一次rebalance,一共要进行10次rebalance,这个耗时就比较长了。具体的细节可以查看:https://www.cnblogs.com/huxi2b/p/6815797.html

    另外就是,为什么状态不是Empty的时候就延时5分钟呢?这个其实上面就回答了,要等待原来消费组内在线的消费者发送JOIN请求,这个也是rebalance过程耗时劣化的主要原因。

    接下来我们看看这两个延时任务,在超时的时候分别都会做些啥,首先是InitialDelayedJoin:

    /**
      * Delayed rebalance operation that is added to the purgatory when a group is transitioning from
      * Empty to PreparingRebalance
      *
      * When onComplete is triggered we check if any new members have been added and if there is still time remaining
      * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise we complete the
      * rebalance.
      */
    private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
                                            purgatory: DelayedOperationPurgatory[DelayedJoin],
                                            group: GroupMetadata,
                                            configuredRebalanceDelay: Int,
                                            delayMs: Int,
                                            remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {
    
      // 这里写死是false,是为了在tryComplete的时候不被完成
      override def tryComplete(): Boolean = false
    
      override def onComplete(): Unit = {
        // 延时任务处理
        group.inLock  {
          // newMemberAdded是后面有新的member加进来就会是true
          // remainingMs第一次创建该延时任务的时候就是3s。
          // 所以这个条件在第一次的时候都是成立的
          if (group.newMemberAdded && remainingMs != 0) {
            group.newMemberAdded = false
            val delay = min(configuredRebalanceDelay, remainingMs)
            // 最新计算的remaining恒等于0,其实本质上就是3-3=0,
            // 所以哪怕这里是新创建了一个InitialDelayedJoin,这个任务的超时时间就是下一刻
            // 这么写的目的其实就是相当于去完成这个延时任务
            val remaining = max(remainingMs - delayMs, 0)
            purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
              purgatory,
              group,
              configuredRebalanceDelay,
              delay,
              remaining
            ), Seq(GroupKey(group.groupId)))
          } else
            // 如果没有新的member加入,直接调用父类的函数
            // 完成JOIN阶段
            super.onComplete()
        }
      }
    }
    
    

    大意我都写在注释里面了,其实就是等待3s,然后完了之后调用父类的函数完成整个JOIN阶段,不过不联系上下文去看,还是挺费劲的,对了看这个需要对时间轮源码有了解

    接着看下DelayedJoin超时后会干嘛:

    /**
     * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
     *
     * Whenever a join-group request is received, check if all known group members have requested
     * to re-join the group; if yes, complete this operation to proceed rebalance.
     *
     * When the operation has expired, any known members that have not requested to re-join
     * the group are marked as failed, and complete this operation to proceed rebalance with
     * the rest of the group.
     */
    private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                     group: GroupMetadata,
                                     rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
    
      override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
      override def onExpiration() = coordinator.onExpireJoin()
      override def onComplete() = coordinator.onCompleteJoin(group)
    }
    
      // 超时之后啥也没干,哈哈,因为确实不用做啥,置空就好了
      // 核心是onComplete函数和tryComplete函数
      def onExpireJoin() {
        // TODO: add metrics for restabilize timeouts
      }
    
    
    
      def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
        group.inLock {
          if (group.notYetRejoinedMembers.isEmpty)
            forceComplete()
          else false
        }
      }
      def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
      
      def forceComplete(): Boolean = {
        if (completed.compareAndSet(false, true)) {
          // cancel the timeout timer
          cancel()
          onComplete()
          true
        } else {
          false
        }
      }
    
    
    
      def onCompleteJoin(group: GroupMetadata) {
        group.inLock {
          // remove any members who haven‘t joined the group yet
          // 如果组内成员依旧没能连上,那么就删除它,接收当前JOIN阶段
          group.notYetRejoinedMembers.foreach { failedMember =>
            group.remove(failedMember.memberId)
            // TODO: cut the socket connection to the client
          }
    
          if (!group.is(Dead)) {
            // 状态机流转 : preparingRebalancing -> CompletingRebalance
            group.initNextGeneration()
            if (group.is(Empty)) {
              info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
                s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
    
              groupManager.storeGroup(group, Map.empty, error => {
                if (error != Errors.NONE) {
                  // we failed to write the empty group metadata. If the broker fails before another rebalance,
                  // the previous generation written to the log will become active again (and most likely timeout).
                  // This should be safe since there are no active members in an empty generation, so we just warn.
                  warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
                }
              })
            } else {
              // JOIN阶段标志结束日志
              info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
                s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
    
              // trigger the awaiting join group response callback for all the members after rebalancing
              for (member <- group.allMemberMetadata) {
                assert(member.awaitingJoinCallback != null)
                val joinResult = JoinGroupResult(
                  // 如果是leader 就返回member列表及其元数据信息
                  members = if (group.isLeader(member.memberId)) {
                    group.currentMemberMetadata
                  } else {
                    Map.empty
                  },
                  memberId = member.memberId,
                  generationId = group.generationId,
                  subProtocol = group.protocolOrNull,
                  leaderId = group.leaderOrNull,
                  error = Errors.NONE)
    
                member.awaitingJoinCallback(joinResult)
                member.awaitingJoinCallback = null
                completeAndScheduleNextHeartbeatExpiration(group, member)
              }
            }
          }
        }
      }
    
    

    上面这一串代码有几个要点,首先,这个任务超时的时候是啥也不干的,为什么呢?这里要了解时间轮的机制,代码也在上面,当一个任务超时的时候,时间轮强制执行对应任务的onComplete函数,然后执行onExpiration函数,其实onExpiration函数对于这个延时任务来说是没有意义的,并不需要做什么,打日志都懒得打。
    第二点就是这个任务onComplete什么时候会被调用呢?难道就只能等待5分钟超时才能被调用吗?那不是每一次rebalance都必须要等待5分钟?当然不可能啦,这里就需要先看下tryComplete函数的内容,发现这个内容会去检查还没连上的member,如果发现到期了,就强制完成。那么我们看下这tryComplete是在哪儿被调用的?这里需要插入一点之前没贴全的代码,在doJoinGroup函数中的而最后一段:

    if (group.is(PreparingRebalance))
          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    
    

    这段代码非常关键,当当前状态是PreparingRebalance的时候,会尝试去完成当前的延时任务,最终调用的代码:

     private[server] def maybeTryComplete(): Boolean = {
        var retry = false
        var done = false
        do {
          if (lock.tryLock()) {
            try {
              tryCompletePending.set(false)
              done = tryComplete()
            } finally {
              lock.unlock()
            }
            // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
            // `tryCompletePending`. In this case we should retry.
            retry = tryCompletePending.get()
          } else {
            // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
            // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
            // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
            // released the lock and returned by the time the flag is set.
            retry = !tryCompletePending.getAndSet(true)
          }
        } while (!isCompleted && retry)
        done
      }
    

    就是上面的tryComplete函数,最终会调用到DelayedJoin中的tryComplete函数,什么意思呢?已经很明显了,每来一个JOIN请求的时候,如果处于PreparingRebalance阶段,都会去检查一下group中原来的成员是否已经到齐了,到齐了就立刻结束JOIN阶段往后走。看到这儿,回头看下InitialDelayedJoin这个延时任务的tryComplete为什么就默认实现了个false呢?也明白了,就是初始化延时任务的时候不让你尝试完成,我就等3s,不需要你们来触发我提前完成。

    以上,我们就看完了整个服务端的JOIN请求处理过程,其实主要核心就是这两个延时任务,如果不联系上下文,不了解时间轮机制,看起来确实费劲。接下来就看下SYNC阶段是如何处理的。

    两个角色

    Consumer Group Co-ordinator
    Group Leader

    [图片上传失败...(image-558ab0-1627916151967)]
    rebalance过程有以下几点

    1. rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。
    2. consumer如何向coordinator证明自己还活着? 通过定时向coordinator发送Heartbeat请求。如果超过了设定的超时时间,那么coordinator就认为这个consumer已经挂了。
    3. 一旦coordinator认为某个consumer挂了,那么它就会开启新一轮rebalance,并且在当前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告诉其他consumer:不好意思各位,你们重新申请加入组吧!
    4. 所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator选择第一个发送JoinGroup请求的consumer担任leader的角色,并将consumer group 信息和partition信息告诉group leader。
    5. leader负责分配消费方案(使用PartitionAssignor),即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。

    小结一下就是:coordinator负责决定leader,leader 负责分配方案,consumer group的分区分配方案是在客户端执行的, 分配方案由coordinator 扩散。

    Rebalance过程

    rebalance的前提是coordinator已经确定了。
    总体而言,rebalance分为2步:Join和Sync
    1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

    2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

    注意!! consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。比如这种机制下我可以实现类似于Hadoop那样的机架感知(rack-aware)分配方案,即为consumer挑选同一个机架下的分区数据,减少网络传输的开销。Kafka默认为你提供了两种分配策略:range和round-robin。由于这不是本文的重点,这里就不再详细展开了,你只需要记住你可以覆盖consumer的参数:partition.assignment.strategy来实现自己分配策略就好了。

    consumer group状态机
    和很多kafka组件一样,group也做了个状态机来表明组状态的流转。coordinator根据这个状态机会对consumer group做不同的处理,如下图所示

    [图片上传失败...(image-e8410e-1627916151967)]
    简单说明下图中的各个状态:
    Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
    Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
    PreparingRebalance:组准备开启新的rebalance,等待成员加入
    AwaitingSync:正在等待leader consumer将分配方案传给各个成员
    Stable:rebalance完成!可以开始消费了

    GroupCoordinator joingroup源码解析

    kafka新版consumer所有的group管理工作在服务端都由GroupCoordinator这个新角色来处理,最近测试发现consumer在reblance过程中会有各种各样的等待行为,于是研究下相关源码,GroupCoordinator是broker服务端处理consumer各种group相关请求的管理类。本次源码研究版本是0.10.2.0

    首先贴一下huxihx在Kafka消费组(consumer group)画过的一个流程图

    [图片上传失败...(image-b224b6-1627916151967)]

    这个图以及下面的几个流程图非常清晰的表明了当一个consumer(无论是新初始化的实例还是各种情况重新reblance的已有客户端)试图加入一个group的第一步都是先发送一个JoinGoupRequest到Coordinator,这个请求里具体包含了什么信息可以从AbstractCoordinator这个类的源代码找到

    /**
       * Join the group and return the assignment for the next generation. This function handles both
       * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
       * elected leader by the coordinator.
       * @return A request future which wraps the assignment returned from the group leader
       */
      private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
          if (coordinatorUnknown())
              return RequestFuture.coordinatorNotAvailable();
     
          // send a join group request to the coordinator
          log.info("(Re-)joining group {}", groupId);
          JoinGroupRequest.Builder requestBuilder= new JoinGroupRequest.Builder(
                  groupId,
                  this.sessionTimeoutMs,
                  this.generation.memberId,
                  protocolType(),
                  metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
     
          log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder,this.coordinator);
          return client.send(coordinator, requestBuilder)
                  .compose(new JoinGroupResponseHandler());
     
     
     
    private Generation generation= Generation.NO_GENERATION;
     
     
      protected staticclass Generation {
          public staticfinal Generation NO_GENERATION= new Generation(
                  OffsetCommitRequest.DEFAULT_GENERATION_ID,
                  JoinGroupRequest.UNKNOWN_MEMBER_ID,
                  null);
     
          publicfinal int generationId;
          publicfinal String memberId;
          publicfinal String protocol;
     
          public Generation(int generationId, String memberId, String protocol) {
              this.generationId= generationId;
              this.memberId= memberId;
              this.protocol= protocol;
          }
      }  
    

    上述可以看出sendJoinGroupRequest里面包含了groupid,sesseionTimeout,membeid,rebalancetimeout等几个属性,如果是新初始化的consumer程序generation属性默认为NO_GENERATION,memberid就是JoinGroupRequest.UNKNOWN_MEMBER_ID

      然后是server处理sendJoinGroupRequest的代码,请求被转交到了GroupCoordinator类里的handleJoinGroup方法,该方法在校验了部分参数和group状态的合法性后将具体工作放到了doJoinGroup方法里。
    
    private def doJoinGroup(group: GroupMetadata,
                              memberId: String,
                              clientId: String,
                              clientHost: String,
                              rebalanceTimeoutMs: Int,
                              sessionTimeoutMs: Int,
                              protocolType: String,
                              protocols: List[(String, Array[Byte])],
                              responseCallback: JoinCallback) {
        group synchronized {
          if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
            // if the new member does not support the group protocol, reject it
            responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
          }else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
            // if the member trying to register with a un-recognized id, send the response to let
            // it reset its member id and retry
            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
          }else {
            group.currentStatematch {
              case Dead=>
                // if the group is marked as dead, it means some other thread has just removed the group
                // from the coordinator metadata; this is likely that the group has migrated to some other
                // coordinator OR the group is in a transient unstable phase. Let the member retry
                // joining without the specified member id,
                responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
     
              case PreparingRebalance=>
                if (memberId== JoinGroupRequest.UNKNOWN_MEMBER_ID) {
                  addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
                }else {
                  val member= group.get(memberId)
                  updateMemberAndRebalance(group, member, protocols, responseCallback)
                }
     
              case AwaitingSync=>
                if (memberId== JoinGroupRequest.UNKNOWN_MEMBER_ID) {
                  addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
                }else {
                  val member= group.get(memberId)
                  if (member.matches(protocols)) {
                    // member is joining with the same metadata (which could be because it failed to
                    // receive the initial JoinGroup response), so just return current group information
                    // for the current generation.
                    responseCallback(JoinGroupResult(
                      members= if (memberId== group.leaderId) {
                        group.currentMemberMetadata
                      }else {
                        Map.empty
                      },
                      memberId= memberId,
                      generationId= group.generationId,
                      subProtocol= group.protocol,
                      leaderId= group.leaderId,
                      errorCode= Errors.NONE.code))
                  }else {
                    // member has changed metadata, so force a rebalance
                    updateMemberAndRebalance(group, member, protocols, responseCallback)
                  }
                }
     
              case Empty | Stable=>
                if (memberId== JoinGroupRequest.UNKNOWN_MEMBER_ID) {
                  // if the member id is unknown, register the member to the group
                  addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
                }else {
                  val member= group.get(memberId)
                  if (memberId== group.leaderId || !member.matches(protocols)) {
                    // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
                    // The latter allows the leader to trigger rebalances for changes affecting assignment
                    // which do not affect the member metadata (such as topic metadata changes for the consumer)
                    updateMemberAndRebalance(group, member, protocols, responseCallback)
                  }else {
                    // for followers with no actual change to their metadata, just return group information
                    // for the current generation which will allow them to issue SyncGroup
                    responseCallback(JoinGroupResult(
                      members= Map.empty,
                      memberId= memberId,
                      generationId= group.generationId,
                      subProtocol= group.protocol,
                      leaderId= group.leaderId,
                      errorCode= Errors.NONE.code))
                  }
                }
            }
     
            if (group.is(PreparingRebalance))
              joinPurgatory.checkAndComplete(GroupKey(group.groupId))
          }
        }
      }  
    
    

    GroupMetadata对象是一个有PreparingRebalance,AwaitingSync,Stable,Dead,Empty几种状态的状态机,在服务端用于表示当前管理group的状态。

    第一批consumer加入group

    1 由上文可知,新初始化的consumer刚开始的memberid都是JoinGroupRequest.UNKNOWN_MEMBER_ID,所有新成员都进入addMemberAndRebalance方法初始化一个member对象并add进group列表内部,只有一个加入的member才能进入maybePrepareRebalance的同步代码块内调用prepareReblacne方法

    private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                      sessionTimeoutMs: Int,
                                      clientId: String,
                                      clientHost: String,
                                      protocolType: String,
                                      protocols: List[(String, Array[Byte])],
                                      group: GroupMetadata,
                                      callback: JoinCallback)= {
      // use the client-id with a random id suffix as the member-id
      val memberId= clientId +"-" + group.generateMemberIdSuffix
      val member= new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
        sessionTimeoutMs, protocolType, protocols)
      member.awaitingJoinCallback= callback
      group.add(member)
      maybePrepareRebalance(group)
      member
    }
     
    private def maybePrepareRebalance(group: GroupMetadata) {
      group synchronized {
        if (group.canRebalance)
          prepareRebalance(group)
      }
    }
    

    prepareReblacne会把group的状态由上述的empty转变为PreparingRebalance,后续的客户端会判断PreparingRebalance同样进入addMemberAndRebalance,这样即使第一个member退出maybePrepareRebalance的synchronized代码块,剩余的member会发现group.canRebalacne返回的都是false直接略过

    private def prepareRebalance(group: GroupMetadata) {
      // if any members are awaiting sync, cancel their request and have them rejoin
      if (group.is(AwaitingSync))
        resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
     
      group.transitionTo(PreparingRebalance)
      info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
     
      val rebalanceTimeout= group.rebalanceTimeoutMs
      val delayedRebalance= new DelayedJoin(this, group, rebalanceTimeout)
      val groupKey= GroupKey(group.groupId)
      joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
    }
    
    

    上述代码里生成了一个DelayJoin,DelayJoin是kafka内部一种有超时时间的Timer.task的实现,会在两种情况下根据情况执行对应操作,一是timeout超时,另一种是满足某种条件后由程序主动运行并注销定时任务,注意这里放的时间是rebalanceTimeout而不是sessiontimeout。

    我们看一下joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))和joinPurgatory.checkAndComplete(GroupKey(group.groupId))这两个方法的调用链路。

    joinPurgatory.tryCompleteElseWatch->DelayedJoin.safeTryComplete->DelayedJoin.tryComplete->coordinator.tryCompleteJoin

    joinPurgatory.checkAndComplete->DelayedOperation.checkAndComplete->DelayedJoin.safeTryComplete->DelayedJoin.tryComplete->coordinator.tryCompleteJoin

    所以无论是第一个member结束prepareReblacne还是后续的member在doJoinGroup代码的最后都是去调用一下coordinator.tryCompleteJoin这个方法尝试完成joinGroup的等待

      def tryCompleteJoin(group: GroupMetadata, forceComplete: ()=> Boolean)= {
        group synchronized {
          if (group.notYetRejoinedMembers.isEmpty)
            forceComplete()
          else false
        }
      }
     
    def notYetRejoinedMembers= members.values.filter(_.awaitingJoinCallback== null).toList
    

    tryCompleteJoin的判断逻辑非常简单,GroupMetadata内部缓存的所有member都有对应的注册连接上来(addMemberAndRebalance方法里的member.awaitingJoinCallback = callback会给member的awaitingJoinCallback赋予一个值,值为null的就是有之前的member没有加入进来),如果notYetRejoinedMembers的列表为空,那么客户端就齐了,可以进行reblance分配,如果一直不齐,那么会等到rebalanceTimeout过期后触发强制reblance。

    二 heartbeat和session timeout

    在reblance过程中可以从下列源码看到heartbeat的delay时间设置的是session.timeout,如果一个旧的consumer死掉后在这个时间内持续没有心跳,那么服务端onMemberFailure会把group内对应的memberid删除并重试一下joinPurgatory.checkAndComplete,如果前次删除后notYetRejoinedMembers变为空后那么joingroup的等待也结束了。

    /**
      * Complete existing DelayedHeartbeats for the given member and schedule the next one
      */
     private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
       // complete current heartbeat expectation
       member.latestHeartbeat= time.milliseconds()
       val memberKey= MemberKey(member.groupId, member.memberId)
       heartbeatPurgatory.checkAndComplete(memberKey)
     
       // reschedule the next heartbeat expiration deadline
       val newHeartbeatDeadline= member.latestHeartbeat + member.sessionTimeoutMs
       val delayedHeartbeat= new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
       heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
     }
     
     
    def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
       group synchronized {
         if (!shouldKeepMemberAlive(member, heartbeatDeadline))
           onMemberFailure(group, member)
       }
     }
     
     
     private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
       trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
       group.remove(member.memberId)
       group.currentStatematch {
         case Dead | Empty=>
         case Stable | AwaitingSync=> maybePrepareRebalance(group)
         case PreparingRebalance=> joinPurgatory.checkAndComplete(GroupKey(group.groupId))
       }
     }
    
    

    结论,个人在测试过程中发现重启consumer中会有的部分卡顿大部分应该是由于这个notYetRejoinedMembers的列表由于上一次的关掉的consumer的session没有到期造成非空引起的等待。

    相关文章

      网友评论

        本文标题:kafka consumer rebalance

        本文链接:https://www.haomeiwen.com/subject/dofsvltx.html