一.心跳简介
根据前面分析的Rebalance操作的原理,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。
心跳的请求和响应格式:
HeartbeatRequest消息体比较简单,包含group_id(String),group_generation_id(int),member_id(String)三个字段。HeartbeatResponse消息体只包含short类型的error_code。
二.HeartbeatTask
HeartbeatTask是一个实现DelayedTask接口的定时任务,负责定时发送HeartbeatRequest并处理其响应,实现逻辑都在run()方法中实现,HeartbeatTask.run()的具体流程:
HeartbeatTask.run()的具体流程.jpg
(1)首先检查是否需要发送HeartbeatRequest,条件有三个,一个不满足就不能发送心跳:
- GroupCoordinator 已经确定且已连接。
- 不处于正在等待Partition分配结果的状态。
- 之前的HeartbeatRequest请求正常收到响应且没有过期。
如果不符合条件,就不会再执行HeartbeatTask,等待后续调用reset()方法重启HeartbeatTask任务。
(2)调用Heartbeat.sessionTimeoutExpired(now),判断HeartbeatResponse是否超时,如果超时,则认为GroupCoordinator宕机,调用coordinatorDead()清空其unsent集合中对应的请求队列并将这些请求标记为异常后结束,将coordinator字段设置为Null,表示将重新选举GroupCoordinator。同时停止HeartbeatTask的执行。 coordinatorDead()代码:
/**
* Mark the current coordinator as dead.
*/
protected void coordinatorDead() {
if (this.coordinator != null) {
//将unsent中缓存的要发送给Coordinator节点的请求全部清空,并标记为异常后结束
log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
this.coordinator = null;
}
}
(3)检测HeartbeatTask是否到期,如果不到期则更新到期时间,将HeartbeatTask对象重新添加到DelayedTaskQueue中,等待其到期后执行;如果已经到期就发送HeartbeatRequest请求。
(4)更新最近一次发送HeartbeatRequest请求的时间,将requestInFlignt设置为true,表示有未响应的HeartbeatRequest请求,防止重复发送。
(5)创建HeartbeatRequest请求,并调用ConsumerNetworkClient.send()方法,将请求放入unsent集合中缓存并返回RequestFuture<Void>。然后ConsumerNetworkClient.poll()会将HeartbeatRequest请求发送给GroupCoordinator。
(6)在RequestFuture<Void>对象上添加RequestFutureListener。
HeartbeatTask.run()具体实现:
private class HeartbeatTask implements DelayedTask {
private boolean requestInFlight = false;
public void reset() {
// start or restart the heartbeat task to be executed at the next chance
long now = time.milliseconds();
heartbeat.resetSessionTimeout(now);
client.unschedule(this);
if (!requestInFlight)
client.schedule(this, now);
}
@Override
public void run(final long now) {
if (generation < 0 || needRejoin() || coordinatorUnknown()) {
// no need to send the heartbeat we're not using auto-assignment or if we are
// awaiting a rebalance
return;//第一步:检查是否要发送心跳请求。
}
if (heartbeat.sessionTimeoutExpired(now)) {
// we haven't received a successful heartbeat in one session interval
// so mark the coordinator dead
coordinatorDead();//第二步:上次心跳响应超时,将GroupCoordinator标记为宕机
return;
}
if (!heartbeat.shouldHeartbeat(now)) {//第三步:还没到发送心跳请求的时间。
// we don't need to heartbeat now, so reschedule for when we do
client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
} else {//第四步
heartbeat.sentHeartbeat(now);//更新发送 HeartbeatRequest 的时间
requestInFlight = true;//防止重复发送HeartbeatRequest
//第五步:创建并缓存HeartbeatRequest
RequestFuture<Void> future = sendHeartbeatRequest();
//第六步:添加监听器
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
requestInFlight = false;
long now = time.milliseconds();
heartbeat.receiveHeartbeat(now);
long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
client.schedule(HeartbeatTask.this, nextHeartbeatTime);
}
@Override
public void onFailure(RuntimeException e) {
requestInFlight = false;
client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
}
});
}
}
}
三.HeartbeatResponse处理
sendHeartbeatRequest()
使用HeartbeatCompletionHandler将client.send()方法返回的RequestFuture<ClientResponse>适配成RequestFuture<Void>后返回
/**
* Send a heartbeat request now (visible only for testing).
*/
public RequestFuture<Void> sendHeartbeatRequest() {
//创建HeartbeatRequest
HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
//使用HeartbeatCompletionHandler对RequestFuture<ClientResponse>进行适配
return client.send(coordinator, ApiKeys.HEARTBEAT, req)
.compose(new HeartbeatCompletionHandler());
}
使用HeartbeatCompletionHandler中实现的是HeartbeatResponse的核心逻辑:
CoordinatorResponseHandler.jpg
CoordinatorResponseHandler是一个抽象类,其中有parse()和handle()两个抽象方法,parse()方法对ClientResponse进行解析,得到指定类型的响应;handle()对解析后的响应进行处理。CoordinatorResponseHandler实现了RequestFuture抽象类的onSuccess()方法和onFailure方法。
protected abstract class CoordinatorResponseHandler<R, T>
extends RequestFutureAdapter<ClientResponse, T> {
protected ClientResponse response;//待处理的响应
public abstract R parse(ClientResponse response);
public abstract void handle(R response, RequestFuture<T> future);
@Override
public void onFailure(RuntimeException e, RequestFuture<T> future) {
// mark the coordinator as dead
if (e instanceof DisconnectException)
coordinatorDead();
future.raise(e);//调用adapted对象的raise()方法。
}
@Override
public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
try {
this.response = clientResponse;
R responseObj = parse(clientResponse);//解析clientResponse
handle(responseObj, future);//调用handle()进行处理
} catch (RuntimeException e) {
if (!future.isDone())
future.raise(e);
}
}
}
这里使用的是模板方法模式,由父类方法定义操作流程,子类根据需求个性化实现流程中的抽象方法。这种模式的好处是避免每个子类都有一份流程控制的代码。
HeartbeatResponse的处理流程:
RequestFuture<ClientResponse>和RequestFutureListener<ClientResponse>
实现了配适器的功能。当ClientResponse传递到HeartbeatCompletionHandler时,会通过parse()方法解析成HeartbeatResponse,然后进入handle()方法处理。
在HeartbeatCompletionHandler.handle()方法中,判断HeartbeatResponse中是否包含错误码,如果不包含,则调用RequestFuture<Void>的complete(null)方法,将HeartbeatResponse成功的事件传播下去,否则,根据错误码分类处理,并调用raise()设置对应的异常。如:
错误码是Errors.ILLEGAL_GENERATION,表示HeartbeatRequest中携带的generationId过期,GroupCoordinator已经开始新一轮的Rebalance操作,则将rejoinNeeded设置为true,这样会重新发送JoinGroupRequest请求尝试加入Consumer Group,也会导致HeartbeatTask任务停止。如果错误码是UNKNOWN_MEMBER_ID,表示GroupCoordinator识别不了此Consumer,则清空memberId,尝试重新加入Consumer Group。
分析handle()方法的具体实现代码:
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
Errors error = Errors.forCode(heartbeatResponse.errorCode());
if (error == Errors.NONE) {//心跳正常
log.debug("Received successful heartbeat response for group {}", groupId);
future.complete(null);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
//找不到服务器对应的GroupCoordinator
log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
groupId, coordinator);
//清空unsent集合中对应的请求,并重新查找对应的GroupCoordinator
coordinatorDead();
future.raise(error);//设置
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
//正在rebalance,会重新发送JoinGroupRequest消息
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) {
log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
//重新发送JoinGroupRequest消息
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
//重新发送JoinGroupRequest消息
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
HeartbeatCompletionHandler.handle()方法中会调用RequestFuture<Void>的complete()方法或raise()方法,这两个方法重没有处理逻辑,但是会触发其上的RequestFutureListener<Void>(在HeartbeatTask.run()方法中注册),此监听器会将requestInFlight设置为false,表示所有HeartbeatRequest都已经完成,并将HeartbeatTask重新放入定时任务队列,等待下一次到期执行。
网友评论