ConsumerCoordinator继承于AbstractCoordinator,也是其唯一的实现类。AbstractCoordinator定义了有关集群协调的逻辑,定义了消费者与特定的broker(cordinator)交互的逻辑,供消费者加入消费组、探知消费组状态。
/**
* This class manages the coordination process with the consumer coordinator.
*/
public final class ConsumerCoordinator extends AbstractCoordinator {
/**
* AbstractCoordinator implements group management for a single group member by interacting with
* (很长的注释,建议读者阅读一遍,此处省略)
* ...
*/
public abstract class AbstractCoordinator implements Closeable {
ConsumerCoordinator主要负责与消费者组coordinator间的联系,比如发现coordinator、加入group、还有查询提交的offset、提交offset。
在加入group后,还会启动HeartBeatThread维持与coordinator的心跳,维持成员状态。

主要流程
在KafkaConsumer::pollOnce中,总是会先调用一次ConsumerCoordinator::poll。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
long startMs = time.milliseconds();
coordinator.poll(startMs, timeout);
...
在poll调用中除了处理offset commit,还有与coordinator取得联系、完成rebalance。
阅读过Kafka消费者Rebalance机制的话[1]就知道:
- coordinator就是指__consumers_offsets对应分区的leader,
ensureCoordinatorReady
会与某broker联系,找到coordinator,并与之建立连接 - rebalance分为join和sync两个阶段,
ensureActiveGroup
会检查自己的group状态,与coordinator联系,完成加入group的流程。

中期总结
在继续深入阅读源码前,我们需要规划一下方向。
ConsumerCoordinator有大量的逻辑判断代码,又会经常修改自身状态和SubscriptionState。如果单纯地去记忆发出xx请求,收到响应/报错后做什么,根本无法记下来,而且也无助于理解。个人觉得:
- 可以先从poll入手,去阅读主要的流程,再深入看其它细节
- 可以假设一切请求/响应都成功,看看顺利执行时的流程是怎么推进的
ConsumerCoordinator和HeartBeatThread都有以下特点,可能有助于代码的阅读:
- 利用ConsumerNetworkClient完成与Kafka节点的通信,发出请求、制定异步响应流程
- 请求-响应流程是异步的,因此到处可见用RequestFuture[2]来构建异步流程的操作。
- 都会并发修改集群状态,造成race condition,因此调用
synchronized (AbstractCoordinator.this)
进行同步 -
定义了大量xxResponseHandler,制定了收到响应后的行为。比如JoinGroupResponseHandler描述了发出join group请求并收到响应后的行为。
ensureCoordinatorReady
/**
* Block until the coordinator for this group is known and is ready to receive requests.
*/
public synchronized void ensureCoordinatorReady() {
// Using zero as current time since timeout is effectively infinite
ensureCoordinatorReady(0, Long.MAX_VALUE);
}
protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
long remainingMs = timeoutMs;
while (coordinatorUnknown()) {
RequestFuture<Void> future = lookupCoordinator(); // 发送lookup请求,等待回复
client.poll(future, remainingMs); // 执行IO
if (future.failed()) {
if (future.isRetriable()) {
...
client.awaitMetadataUpdate(remainingMs); // 等待刷新Metadata
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
...
}
...
}
return !coordinatorUnknown();
}
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
...
} else
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
return findCoordinatorFuture;
}
/**
* Discover the current coordinator for the group. Sends a GroupMetadata request to
* one of the brokers. The returned future should be polled to get the result of the request.
* @return A request future which indicates the completion of the metadata request
*/
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received FindCoordinator response {}", resp);
clearFindCoordinatorFuture();
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
// use MAX_VALUE - node.id as the coordinator id to allow separate connections
// for the coordinator in the underlying network client layer
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.node().host(),
findCoordinatorResponse.node().port());
log.info("Discovered group coordinator {}", coordinator);
client.tryConnect(coordinator); // 连接coordinator
heartbeat.resetTimeouts(time.milliseconds()); // 重置heartbeat
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
...
} else {
...
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
...
}
}
然后阅读ensureCoordinatorReady(long startTimeMs, long timeoutMs)
即可,大概逻辑就是:
- 只要还没与coordinator取得联系,
ensureCoordinatorReady
会循环调用lookupCoordinator
,写出请求,并调用client.poll执行IO。 -
lookupCoordinator
选择一个broker调用sendFindCoordinatorRequest
-
sendFindCoordinatorRequest
向broker发送寻找coordinator的请求, 并制定响应由FindCoordinatorResponseHandler
处理 - 收到响应后,FindCoordinatorResponseHandler进行处理,与coordinator建立连接
ensureActiveGroup
/**
* Ensure that the group is active (i.e. joined and synced)
*/
public void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();
startHeartbeatThreadIfNeeded();
joinGroupIfNeeded();
}
主要逻辑在joinGroupIfNeeded
-
读者需要理解RequestFuture的用法,否则会对各种addListener、compose、chain调用感到疑惑,而这些都是制定异步流程的方法。可阅读RequestFuture原理 ↩
网友评论