美文网首页
Kafka源码分析-Consumer(8)-Rebalance分

Kafka源码分析-Consumer(8)-Rebalance分

作者: 陈阳001 | 来源:发表于2018-12-28 12:30 被阅读0次

    第二阶段:

    在成功找到对应的GroupCoordinator之后进入了Join Group阶段。这个阶段,消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

    JoinGroupRequest和JoinGroupResponse的消息格式

    JoinGroup Request.jpg JoinGroup Response.jpg

    JoinGroupRequest中各个字段含义:

    名称 类型 含义
    group_id String Consumer Group的Id
    session_timeout int GroupCoordinator超过session_time指定的时间,没有收到心跳,认为消费者下线
    member_id String GroupCoordinator分配给消费者的id
    protocol_type String Consumer Group实现的协议,默认是“consumer”
    group_protocols List 包含此消费者支持的全部PartitionAssignor类型
    protocol_ name String PartitionAssignor的名称
    protocol_ metadata byte数组 针对不同的PartitionAssignor,序列化后的消费者的订阅信息,其中包括用户自定义数据的userData

    JoinGroupResponse中各个字段含义:

    名称 类型 含义
    error_code short 错误码
    generation_id int GroupCoordinator分配的年代信息
    group_protocol String GroupCoordinator选择的PartitionAssignor
    leader_id String leader的member_id
    member_id String GroupCoordinator分配给消费者的Id
    members Map集合 PartitionAssignor的名称
    member_ metadata byte数组 对应消费者定义的信息

    分析了JoinGroupRequest和JoinGroupResponse的消息格式后,分析下第二阶段的相关处理流程,其入口方法是ensurePartitionAssignment()。ensurePartitionAssignment()流程如下:


    ensurePartitionAssignment()方法的处理流程.jpg

    1)调用subscriptions.partitionsAutoAssigned()方法,检测Consumer的订阅是否是AUTO_TOPIC或AUTO_PATTERN。因为AUTO_ASSIGNED不需要进行Rebalance操作,而是由用户手动指定分区。
    2)如果订阅模式是AUTO_PATTERN,则检查Metadata是否需要更新。

     /**
         * Ensure our metadata is fresh (if an update is expected, this will block
         * until it has completed).
         */
        public void ensureFreshMetadata() {
            //如果长时间没有更新或 Metadata.needUpdate字段为true,则更新Metadata
            if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
                awaitMetadataUpdate();//阻塞
        }
    

    前面介绍ConsumerCoordinator提过,ConsumerCoordinator的构造方法中为Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则表达式过滤Topic,并更改SubscriptionState中的订阅信息。同时,也会使用metadataSnapshot字段记录当前的Metadata快照。更新Metadata的原因,是为了防止因使用过期的Metadata进行Rebalance操作而导致多次连续的Rebalance操作。
    3)调用ConsumerCoordinator.needRejoin()方法判断是否要发送JoinGroupRequest加入ConsumerGroup,其实现在是检测是否使用了AUTO_TOPIC或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值。

    @Override
        public boolean needRejoin() {
            return subscriptions.partitionsAutoAssigned() &&//检测subscriptionType
                    (super.needRejoin() //检测 rejoinNeeded的值
                            || subscriptions.partitionAssignmentNeeded());
        }
    

    4)调用onJoinPrepare()方法进行发送JoinGroupRequest之前的准备,准备三个事情:

    • 如果开启了自动提交offset则进行同步提交offset,此步骤会阻塞线程。
    • 调用注册在SubscriptionState中的ConsumerRebalanceListener上的回调方法。
    • 将SubscriptionState的needsPartitionAssignment字段设置为true并收缩groupSubscription集合。
    @Override
        protected void onJoinPrepare(int generation, String memberId) {
            // commit offsets prior to rebalance if auto-commit enabled 进行一次同步提交offsets的操作
            maybeAutoCommitOffsetsSync();
            //调用SubscriptionState中设置的ConsumerRebalanceListener
            // execute the user's callback before rebalance
            ConsumerRebalanceListener listener = subscriptions.listener();
            log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
            try {
                Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
                listener.onPartitionsRevoked(revoked);
            } catch (WakeupException e) {
                throw e;
            } catch (Exception e) {
                log.error("User provided listener {} for group {} failed on partition revocation",
                        listener.getClass().getName(), groupId, e);
            }
    
            assignmentSnapshot = null;
            subscriptions.needReassignment();//将needsPartitionAssignment设置为true
        }
    

    5)再次调用needRejoin()方法检测,之后调用ensureCoordinatorReady()方法检测以及找到GroupCoordinator且并之建立连接。
    6)如果还有发往GroupCoordinator所在Node的请求,则阻塞等待这些请求全部发送完成并收到响应(即等待unsent和InFligntRequests的对应队列为空),然后返回步骤5继续进行,这是为了避免重复发送JoinGroupRequest请求。
    7)调用sendJoinGroupRequest() 方法创建JoinGroupRequest请求,并调用ConsumerNetworkClient.send()方法将请求放入unsent中缓存,等待发送,具体如下:

    /**
         * Join the group and return the assignment for the next generation. This function handles both
         * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
         * elected leader by the coordinator.
         * @return A request future which wraps the assignment returned from the group leader
         */
        private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
            if (coordinatorUnknown())//检测GroupCoordinator
                return RequestFuture.coordinatorNotAvailable();
    
            // send a join group request to the coordinator
            log.info("(Re-)joining group {}", groupId);
            //创建 JoinGroupRequest
            JoinGroupRequest request = new JoinGroupRequest(
                    groupId,
                    this.sessionTimeoutMs,
                    this.memberId,
                    protocolType(),
                    metadata());
    
            log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
            //将JoinGroupRequest放入unsent集合等待发送
            //注意,JoinGroupResponseHandler是JoinGroupResponse处理的入口
            return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                    .compose(new JoinGroupResponseHandler());
        }
    

    8)在步骤7返回的RequestFuture<ByteBuffer>对象上添加RequestFutureListener。
    9)调用ConsumerNetworkClient.poll()方法发送JoinGroupRequest,这里会阻塞直到返回JoinGroupResponse或异常。
    10)检测RequestFuture.fail()。如果出现RetriableException异常则重试,其他异常则报错。如果没有异常的话,则第二阶段结束。
    下面看下ensurePartitionAssignment()方法:

    /**
         * Ensure that we have a valid partition assignment from the coordinator.
         */
        public void ensurePartitionAssignment() {
            if (subscriptions.partitionsAutoAssigned()) {//第一步:检测订阅类型
                // Due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
                // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
                // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
                // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
                // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
                // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
                if (subscriptions.hasPatternSubscription())//第二步:检测是否需要更新Metadata
                    client.ensureFreshMetadata();
    
                ensureActiveGroup();
            }
        }
    
     /**
         * Ensure that the group is active (i.e. joined and synced)
         */
        public void ensureActiveGroup() {
            // always ensure that the coordinator is ready because we may have been disconnected
            // when sending heartbeats and does not necessarily require us to rejoin the group.
            ensureCoordinatorReady();
    
            if (!needRejoin())//第三步:检测是否需要发送JoinGroupRequest请求。
                return;
    
            if (needsJoinPrepare) {
                //第四步:发送 JoinGroupRequest 请求前的准备操作。
                onJoinPrepare(generation, memberId);
                needsJoinPrepare = false;
            }
    
            while (needRejoin()) {
                ensureCoordinatorReady();//第五步:检测 GroupCoordinator 状态。
    
                // ensure that there are no pending requests to the coordinator. This is important
                // in particular to avoid resending a pending JoinGroup request.
                if (client.pendingRequestCount(this.coordinator) > 0) {
                    //第六步:等待发往GroupCoordinator所在节点的消息全部完成。
                    client.awaitPendingRequests(this.coordinator);
                    continue;
                }
                    //第七步:创建并缓存请求。
                RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
                //第八步:添加监听器。
                future.addListener(new RequestFutureListener<ByteBuffer>() {
                    @Override
                    public void onSuccess(ByteBuffer value) {
                        // handle join completion in the callback so that the callback will be invoked
                        // even if the consumer is woken up before finishing the rebalance
                        onJoinComplete(generation, memberId, protocol, value);
                        needsJoinPrepare = true;
                        heartbeatTask.reset();
                    }
    
                    @Override
                    public void onFailure(RuntimeException e) {
                        // we handle failures below after the request finishes. if the join completes
                        // after having been woken up, the exception is ignored and we will rejoin
                    }
                });
                client.poll(future);//第九步:阻塞等待JoinGroupRequest请求完成。
    
                if (future.failed()) {//第十步:异常处理。
                    RuntimeException exception = future.exception();
                    if (exception instanceof UnknownMemberIdException ||
                            exception instanceof RebalanceInProgressException ||
                            exception instanceof IllegalGenerationException)
                        continue;
                    else if (!future.isRetriable())
                        throw exception;
                    time.sleep(retryBackoffMs);//退避一段时间重试
                }
            }
        }
    

    通过对JoinGroupRequest发送流程的分析,我们知道JoinGroupResponse处理流程的入口是JoinGroupResponseHandler.handle()方法,其中还包括了SyncGroupRequest发送的操作,后面再详细说明,JoinGroupResponse处理流程如下:


    JoinGroupResponse处理流程.jpg
    1. 解析JoinGroupResponse,获取GroupCoordinator分配的memberId,generation等信息,并更新到本地。
    2. 消费者根据leaderId检测自己是不是Leader。如果是Leader则进入onJoinLeader()方法,如果不是Leader则进入onJoinFollower()方法。onJoinFollower()方法是onJoinLeader()方法的子集,下面主要结束下onJoinLeader()方法。
    3. Leader根据JoinGroupResponse的group_protocol字段指定的Partition分配策略,查找相应的PartitionAssignor对象。
    4. Leader将JoinGroupResponse的members字段进行反序列化,得到Consumer Group中全部消费订阅的Topic。Leader会将这些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower则只关心自己订阅的Topic信息。
      5)第四步可能有新的Topic添加进来,所以要更新Metadata信息。
      6)等到Metadata更新完毕后,会在assignmentSnapshot字段中存储一个Metadata快照(通过Metadata的Listener创建的快照)。
      7)调用PartitionAssignor.assign()方法进行分区分配。
      8)将分配的结果序列化,保存到Map中返回,其中key是消费者的member_id,value是分配结果序列化后的ByteBuffer。
      分析JoinGroupResponseHandler.handle()方法:
    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
    
            @Override
            public JoinGroupResponse parse(ClientResponse response) {
                return new JoinGroupResponse(response.responseBody());
            }
    
            @Override
            public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
                Errors error = Errors.forCode(joinResponse.errorCode());
                if (error == Errors.NONE) {
                    //步骤一:解析JoinGroupResponse,更新到本地。
                    log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
                    AbstractCoordinator.this.memberId = joinResponse.memberId();
                    AbstractCoordinator.this.generation = joinResponse.generationId();
                    AbstractCoordinator.this.rejoinNeeded = false;//修改了this.rejoinNeeded = false
                    AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                    sensors.joinLatency.record(response.requestLatencyMs());
                    if (joinResponse.isLeader()) {//步骤二:判断是否为leader
                        /*
                        注意这里,此future是在前面sendJoinGroupRequest()方法返回的 RequestFuture 对象
                        在onJoinLeader()和onJoinFollower()方法中,都涉及发送 SyncGroupRequest 逻辑,
                        返回的RequestFuture 标识是SyncGroupRequest的完成情况。这里使用chain()方法,主要实现
                        的功能是:当SyncGroupResponse处理完成后,再通知这个future对象。
                        
                         */
                        
                        
                        
                        onJoinLeader(joinResponse).chain(future);
                    } else {
                        onJoinFollower().chain(future);
                    }
                } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
                            coordinator);
                    // backoff and retry
                    future.raise(error);
                } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                    // reset the member id and retry immediately
                    AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                    log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
                    future.raise(Errors.UNKNOWN_MEMBER_ID);
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                        || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    // re-discover the coordinator and retry with backoff
                    coordinatorDead();
                    log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
                    future.raise(error);
                } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                        || error == Errors.INVALID_SESSION_TIMEOUT
                        || error == Errors.INVALID_GROUP_ID) {
                    // log the error and re-throw the exception
                    log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
                    future.raise(error);
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(new GroupAuthorizationException(groupId));
                } else {
                    // unexpected error, throw the exception
                    future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
                }
            }
        }
    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
            try {
                // perform the leader synchronization and send back the assignment for the group
                //步骤3-8都是在performAssignment()方法中完成
                Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                        joinResponse.members());
                //创建并发送SyncGroupRequest
                SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
                log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
                return sendSyncGroupRequest(request);
            } catch (RuntimeException e) {
                return RequestFuture.failure(e);
            }
        }
    
    @Override
        protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                            String assignmentStrategy,
                                                            Map<String, ByteBuffer> allSubscriptions) {
            //步骤三:查找分区分配使用的PartitionAssignor
            PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
            if (assignor == null)
                throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
    
            Set<String> allSubscribedTopics = new HashSet<>();
            Map<String, Subscription> subscriptions = new HashMap<>();
            for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
                Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
                subscriptions.put(subscriptionEntry.getKey(), subscription);
                allSubscribedTopics.addAll(subscription.topics());
            }
    
            // the leader will begin watching for changes to any of the topics the group is interested in,
            // which ensures that all metadata changes will eventually be seen
            //步骤四:对应leader来说,要关注Consumer group中所有消费者订阅的topic
            this.subscriptions.groupSubscribe(allSubscribedTopics);
            metadata.setTopics(this.subscriptions.groupSubscription());
    
            // update metadata (if needed) and keep track of the metadata used for assignment so that
            // we can check after rebalance completion whether anything has changed
            client.ensureFreshMetadata(); //步骤五:更新Metadata
            assignmentSnapshot = metadataSnapshot;//步骤六:记录快照
    
            log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
                    groupId, assignor.name(), subscriptions);
            //步骤⑦:进行分区分配
            Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
    
            log.debug("Finished assignment for group {}: {}", groupId, assignment);
            //步骤八:将分区分配结果序列化,并保存到groupAssignment中
            Map<String, ByteBuffer> groupAssignment = new HashMap<>();
            for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
                ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
                
                groupAssignment.put(assignmentEntry.getKey(), buffer);
            }
    
            return groupAssignment;
        }
    

    第三阶段

    完成分区分配后进入了Synchronizing Group State阶段,逻辑是向GroupCoordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应。先分析一下SyncGroupRequest和SyncGroupResponse的消息体格式:


    SyncGroup Request.png SyncGroup Response.jpg

    SyncGroupRequest中各个字段的含义:

    名称 类型 含义
    group_id String Consumer Group的Id
    generation_id int 消费者保存的年代信息
    member_id String GroupCoordinator分配给消费者的id
    member_assignment; byte数组 分区分配的结果

    SyncGroupResponse中各个字段的含义:

    名称 类型 含义
    error_code short 错误码
    member_assignment byte数组 分配给当前消费者的分区

    根据上述onJoinLeader()方法分析,我们了解了发送SyncGroupRequest请求的逻辑在分区分配之后,也是在onJoinLeader()方法中完成的。流程如下:
    1)得到序列化后的分区分配结果后,Leader将其封装成SyncGroupRequest,而Follower形成的SyncGroupRequest中这部分是空的。
    2)调用ConsumerNetworkClient.send()方法将请求放入unsent集合中等待发送。
    对SyncGroupResponse处理的入口是SyncGroupResponseHandler.handle()方法。对于正常完成的情况,解析SyncGroupResponse,从中拿到分区分配结果并将其传递出去;对于出现异常的情况,将rejoinNeeded设置为true,并针对不同的错误码进行不同的处理。

    private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
    
            @Override
            public SyncGroupResponse parse(ClientResponse response) {
                return new SyncGroupResponse(response.responseBody());
            }
    
            @Override
            public void handle(SyncGroupResponse syncResponse,
                               RequestFuture<ByteBuffer> future) {
                Errors error = Errors.forCode(syncResponse.errorCode());
                if (error == Errors.NONE) {
                    //调用RequestFuture.complete()方法传播分区分配结果
                    log.info("Successfully joined group {} with generation {}", groupId, generation);
                    sensors.syncLatency.record(response.requestLatencyMs());
                    future.complete(syncResponse.memberAssignment());
                } else {
                    //将rejoinNeeded设置为true
                    AbstractCoordinator.this.rejoinNeeded = true;
                    if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                        //调用RequestFuture.raise()方法传播异常
                        future.raise(new GroupAuthorizationException(groupId));
                    } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                        log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId);
                        future.raise(error);
                    } else if (error == Errors.UNKNOWN_MEMBER_ID
                            || error == Errors.ILLEGAL_GENERATION) {
                        log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                        AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                        future.raise(error);
                    } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                            || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                        log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                        coordinatorDead();
                        future.raise(error);
                    } else {
                        future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
                    }
                }
            }
        }
    

    从SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()方法处理,调用此方法的是在第二阶段ensureActiveGroup()方法的步骤8中添加的RequestFutureListener中调用。onJoinComplete()方法的流程如下:


    onJoinComplete()方法处理流程.jpg

    1)在第二阶段Leader开始分配分区前,Leader使用assignmentSnapshot字段记录了Metadata快照。此时在Leader中,将此快照与最新的Metadata快照进行对比。如果和快照不一致则表示分区分配过程中出现了Topic增删或分区数量变化,这时将needsPartitionAssignment置为true,需要重新进行分区分配。
    2)反序列化拿到分配给当前消费者的分区,并添加到SubscriptionState.assignment集合中 ,之后消费者会按照此集合指定的分区进行消费,将needsPartitionAssignment置为false。
    3)调用PartitionAssignor的onAssignment()回调函数,默认是空实现。当用户自定义PartitionAssignor是,可以自定义这个方法
    4)如果开启了自动提交的offset的功能,则重新启动AutoCommitTask定时任务。
    5)调用SubscriptionState中注册的ConsumerRebalanceListener
    6)将needsJoinPrepare重置为true,为下次Rebalance的操作做准备。
    7)重启HeartbeatTask定时任务,定时发送心跳。
    onJoinComplete()方法的代码流程:

     @Override
        protected void onJoinComplete(int generation,
                                      String memberId,
                                      String assignmentStrategy,
                                      ByteBuffer assignmentBuffer) {
            // if we were the assignor, then we need to make sure that there have been no metadata updates
            // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
            //第一步:Leader需要比较快照,但Follower不需要。
            if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
                subscriptions.needReassignment();
                return;
            }
            //查找使用的分配策略
            PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
            if (assignor == null)
                throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
            //第二步:反序列化,更新assignment
            Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
            //将needsFetchCommittedOffsets设置为true,允许从服务端获取最近一次提交的offset。
            // set the flag to refresh last committed offsets
            subscriptions.needRefreshCommits();
    
            // update partition assignment
            //填充assignment集合
            subscriptions.assignFromSubscribed(assignment.partitions());
    
            // give the assignor a chance to update internal state based on the received assignment
            assignor.onAssignment(assignment);//第三步:回调函数
    
            // reschedule the auto commit starting from now
            if (autoCommitEnabled)//第四步:开启AutoCommitTask任务
                autoCommitTask.reschedule();
    
            // execute the user's callback after rebalance
            ConsumerRebalanceListener listener = subscriptions.listener();
            log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
            try {
                Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
                //第五步:回调ConsumerRebalanceListener
                listener.onPartitionsAssigned(assigned);
            } catch (WakeupException e) {
                throw e;
            } catch (Exception e) {
                log.error("User provided listener {} for group {} failed on partition assignment",
                        listener.getClass().getName(), groupId, e);
            }
        }
    

    Rebalance操作的执行流程和具体实现就分析完了。当Consumer正常离开ConsumerGroup时会发送LeaveGroupRequest,此时也会触发Rebalance操作。

    相关文章

      网友评论

          本文标题:Kafka源码分析-Consumer(8)-Rebalance分

          本文链接:https://www.haomeiwen.com/subject/eryslqtx.html