美文网首页
Kafka源码分析-Consumer(6)-Heartbeat分

Kafka源码分析-Consumer(6)-Heartbeat分

作者: 陈阳001 | 来源:发表于2018-11-25 21:08 被阅读0次

一.心跳简介

根据前面分析的Rebalance操作的原理,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。

心跳的请求和响应格式:

HeartbeatRequest消息体比较简单,包含group_id(String),group_generation_id(int),member_id(String)三个字段。HeartbeatResponse消息体只包含short类型的error_code。

二.HeartbeatTask

HeartbeatTask是一个实现DelayedTask接口的定时任务,负责定时发送HeartbeatRequest并处理其响应,实现逻辑都在run()方法中实现,HeartbeatTask.run()的具体流程:


HeartbeatTask.run()的具体流程.jpg

(1)首先检查是否需要发送HeartbeatRequest,条件有三个,一个不满足就不能发送心跳:

  • GroupCoordinator 已经确定且已连接。
  • 不处于正在等待Partition分配结果的状态。
  • 之前的HeartbeatRequest请求正常收到响应且没有过期。
    如果不符合条件,就不会再执行HeartbeatTask,等待后续调用reset()方法重启HeartbeatTask任务。
    (2)调用Heartbeat.sessionTimeoutExpired(now),判断HeartbeatResponse是否超时,如果超时,则认为GroupCoordinator宕机,调用coordinatorDead()清空其unsent集合中对应的请求队列并将这些请求标记为异常后结束,将coordinator字段设置为Null,表示将重新选举GroupCoordinator。同时停止HeartbeatTask的执行。 coordinatorDead()代码:
/**
     * Mark the current coordinator as dead.
     */
    protected void coordinatorDead() {
        if (this.coordinator != null) {
            //将unsent中缓存的要发送给Coordinator节点的请求全部清空,并标记为异常后结束
            log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
            client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
            this.coordinator = null;
        }
    }

(3)检测HeartbeatTask是否到期,如果不到期则更新到期时间,将HeartbeatTask对象重新添加到DelayedTaskQueue中,等待其到期后执行;如果已经到期就发送HeartbeatRequest请求。
(4)更新最近一次发送HeartbeatRequest请求的时间,将requestInFlignt设置为true,表示有未响应的HeartbeatRequest请求,防止重复发送。
(5)创建HeartbeatRequest请求,并调用ConsumerNetworkClient.send()方法,将请求放入unsent集合中缓存并返回RequestFuture<Void>。然后ConsumerNetworkClient.poll()会将HeartbeatRequest请求发送给GroupCoordinator。
(6)在RequestFuture<Void>对象上添加RequestFutureListener。
HeartbeatTask.run()具体实现:

private class HeartbeatTask implements DelayedTask {

        private boolean requestInFlight = false;

        public void reset() {
            // start or restart the heartbeat task to be executed at the next chance
            long now = time.milliseconds();
            heartbeat.resetSessionTimeout(now);
            client.unschedule(this);

            if (!requestInFlight)
                client.schedule(this, now);
        }

        @Override
        public void run(final long now) {
            if (generation < 0 || needRejoin() || coordinatorUnknown()) {
                // no need to send the heartbeat we're not using auto-assignment or if we are
                // awaiting a rebalance
                return;//第一步:检查是否要发送心跳请求。
            }

            if (heartbeat.sessionTimeoutExpired(now)) {
                // we haven't received a successful heartbeat in one session interval
                // so mark the coordinator dead
                coordinatorDead();//第二步:上次心跳响应超时,将GroupCoordinator标记为宕机
                return;
            }

            if (!heartbeat.shouldHeartbeat(now)) {//第三步:还没到发送心跳请求的时间。
                // we don't need to heartbeat now, so reschedule for when we do
                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
            } else {//第四步
                heartbeat.sentHeartbeat(now);//更新发送 HeartbeatRequest 的时间
                requestInFlight = true;//防止重复发送HeartbeatRequest
                //第五步:创建并缓存HeartbeatRequest
                RequestFuture<Void> future = sendHeartbeatRequest();
                //第六步:添加监听器
                future.addListener(new RequestFutureListener<Void>() {
                    @Override
                    public void onSuccess(Void value) {
                        requestInFlight = false;
                        long now = time.milliseconds();
                        heartbeat.receiveHeartbeat(now);
                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        requestInFlight = false;
                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
                    }
                });
            }
        }
    }

三.HeartbeatResponse处理

sendHeartbeatRequest()

使用HeartbeatCompletionHandler将client.send()方法返回的RequestFuture<ClientResponse>适配成RequestFuture<Void>后返回

/**
     * Send a heartbeat request now (visible only for testing).
     */
    public RequestFuture<Void> sendHeartbeatRequest() {
        //创建HeartbeatRequest
        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
        //使用HeartbeatCompletionHandler对RequestFuture<ClientResponse>进行适配
        return client.send(coordinator, ApiKeys.HEARTBEAT, req)
                .compose(new HeartbeatCompletionHandler());
    }

使用HeartbeatCompletionHandler中实现的是HeartbeatResponse的核心逻辑:


CoordinatorResponseHandler.jpg

CoordinatorResponseHandler是一个抽象类,其中有parse()和handle()两个抽象方法,parse()方法对ClientResponse进行解析,得到指定类型的响应;handle()对解析后的响应进行处理。CoordinatorResponseHandler实现了RequestFuture抽象类的onSuccess()方法和onFailure方法。

protected abstract class CoordinatorResponseHandler<R, T>
            extends RequestFutureAdapter<ClientResponse, T> {
        protected ClientResponse response;//待处理的响应
        
        public abstract R parse(ClientResponse response);

        public abstract void handle(R response, RequestFuture<T> future);

        @Override
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            // mark the coordinator as dead
            if (e instanceof DisconnectException)
                coordinatorDead();
            future.raise(e);//调用adapted对象的raise()方法。
        }

        @Override
        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
            try {
                this.response = clientResponse;
                R responseObj = parse(clientResponse);//解析clientResponse
                handle(responseObj, future);//调用handle()进行处理
            } catch (RuntimeException e) {
                if (!future.isDone())
                    future.raise(e);
            }
        }
    }

这里使用的是模板方法模式,由父类方法定义操作流程,子类根据需求个性化实现流程中的抽象方法。这种模式的好处是避免每个子类都有一份流程控制的代码。
HeartbeatResponse的处理流程:

HeartbeatResponse处理流程.jpg

RequestFuture<ClientResponse>和RequestFutureListener<ClientResponse>
实现了配适器的功能。当ClientResponse传递到HeartbeatCompletionHandler时,会通过parse()方法解析成HeartbeatResponse,然后进入handle()方法处理。
在HeartbeatCompletionHandler.handle()方法中,判断HeartbeatResponse中是否包含错误码,如果不包含,则调用RequestFuture<Void>的complete(null)方法,将HeartbeatResponse成功的事件传播下去,否则,根据错误码分类处理,并调用raise()设置对应的异常。如:
错误码是Errors.ILLEGAL_GENERATION,表示HeartbeatRequest中携带的generationId过期,GroupCoordinator已经开始新一轮的Rebalance操作,则将rejoinNeeded设置为true,这样会重新发送JoinGroupRequest请求尝试加入Consumer Group,也会导致HeartbeatTask任务停止。如果错误码是UNKNOWN_MEMBER_ID,表示GroupCoordinator识别不了此Consumer,则清空memberId,尝试重新加入Consumer Group。
分析handle()方法的具体实现代码:

 @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            sensors.heartbeatLatency.record(response.requestLatencyMs());
            Errors error = Errors.forCode(heartbeatResponse.errorCode());
            if (error == Errors.NONE) {//心跳正常
                log.debug("Received successful heartbeat response for group {}", groupId);
                future.complete(null);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                //找不到服务器对应的GroupCoordinator
                log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
                        groupId, coordinator);
                //清空unsent集合中对应的请求,并重新查找对应的GroupCoordinator
                coordinatorDead();
                future.raise(error);//设置
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
                //正在rebalance,会重新发送JoinGroupRequest消息
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
                //重新发送JoinGroupRequest消息
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
                //重新发送JoinGroupRequest消息
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }

HeartbeatCompletionHandler.handle()方法中会调用RequestFuture<Void>的complete()方法或raise()方法,这两个方法重没有处理逻辑,但是会触发其上的RequestFutureListener<Void>(在HeartbeatTask.run()方法中注册),此监听器会将requestInFlight设置为false,表示所有HeartbeatRequest都已经完成,并将HeartbeatTask重新放入定时任务队列,等待下一次到期执行。

相关文章

网友评论

      本文标题:Kafka源码分析-Consumer(6)-Heartbeat分

      本文链接:https://www.haomeiwen.com/subject/ccemqqtx.html