美文网首页
KafkaConsumer 组件源码 ConsumerCoord

KafkaConsumer 组件源码 ConsumerCoord

作者: 不存在的里皮 | 来源:发表于2020-07-01 17:28 被阅读0次

ConsumerCoordinator继承于AbstractCoordinator,也是其唯一的实现类。AbstractCoordinator定义了有关集群协调的逻辑,定义了消费者与特定的broker(cordinator)交互的逻辑,供消费者加入消费组、探知消费组状态。

/**
 * This class manages the coordination process with the consumer coordinator.
 */
public final class ConsumerCoordinator extends AbstractCoordinator {

/**
 * AbstractCoordinator implements group management for a single group member by interacting with
 * (很长的注释,建议读者阅读一遍,此处省略)
*  ...
 */
public abstract class AbstractCoordinator implements Closeable {

ConsumerCoordinator主要负责与消费者组coordinator间的联系,比如发现coordinator、加入group、还有查询提交的offset、提交offset。
在加入group后,还会启动HeartBeatThread维持与coordinator的心跳,维持成员状态。

主要流程

在KafkaConsumer::pollOnce中,总是会先调用一次ConsumerCoordinator::poll。

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();

        long startMs = time.milliseconds();
        coordinator.poll(startMs, timeout);
        ...

在poll调用中除了处理offset commit,还有与coordinator取得联系、完成rebalance。
阅读过Kafka消费者Rebalance机制的话[1]就知道:

  • coordinator就是指__consumers_offsets对应分区的leader,ensureCoordinatorReady会与某broker联系,找到coordinator,并与之建立连接
  • rebalance分为join和sync两个阶段,ensureActiveGroup会检查自己的group状态,与coordinator联系,完成加入group的流程。

中期总结

在继续深入阅读源码前,我们需要规划一下方向。
ConsumerCoordinator有大量的逻辑判断代码,又会经常修改自身状态和SubscriptionState。如果单纯地去记忆发出xx请求,收到响应/报错后做什么,根本无法记下来,而且也无助于理解。个人觉得:

  1. 可以先从poll入手,去阅读主要的流程,再深入看其它细节
  2. 可以假设一切请求/响应都成功,看看顺利执行时的流程是怎么推进的

ConsumerCoordinator和HeartBeatThread都有以下特点,可能有助于代码的阅读:

  1. 利用ConsumerNetworkClient完成与Kafka节点的通信,发出请求、制定异步响应流程
  2. 请求-响应流程是异步的,因此到处可见用RequestFuture[2]来构建异步流程的操作。
  3. 都会并发修改集群状态,造成race condition,因此调用synchronized (AbstractCoordinator.this)进行同步
  4. 定义了大量xxResponseHandler,制定了收到响应后的行为。比如JoinGroupResponseHandler描述了发出join group请求并收到响应后的行为。


ensureCoordinatorReady

/**
* Block until the coordinator for this group is known and is ready to receive requests.
*/
public synchronized void ensureCoordinatorReady() {
  // Using zero as current time since timeout is effectively infinite
  ensureCoordinatorReady(0, Long.MAX_VALUE);
}

protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
    long remainingMs = timeoutMs;

    while (coordinatorUnknown()) {
        RequestFuture<Void> future = lookupCoordinator();  // 发送lookup请求,等待回复
        client.poll(future, remainingMs);  // 执行IO

        if (future.failed()) {
            if (future.isRetriable()) {
                ...
                client.awaitMetadataUpdate(remainingMs);  // 等待刷新Metadata
            } else
                throw future.exception();
        } else if (coordinator != null && client.connectionFailed(coordinator)) {
            ...
        }
        ...
    }

    return !coordinatorUnknown();
}

protected synchronized RequestFuture<Void> lookupCoordinator() {
    if (findCoordinatorFuture == null) {
        // find a node to ask about the coordinator
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            ...
        } else
            findCoordinatorFuture = sendFindCoordinatorRequest(node);
    }
    return findCoordinatorFuture;
}

/**
 * Discover the current coordinator for the group. Sends a GroupMetadata request to
 * one of the brokers. The returned future should be polled to get the result of the request.
 * @return A request future which indicates the completion of the metadata request
 */
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
    // initiate the group metadata request
    log.debug("Sending FindCoordinator request to broker {}", node);
    FindCoordinatorRequest.Builder requestBuilder =
            new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
    return client.send(node, requestBuilder)
                 .compose(new FindCoordinatorResponseHandler());
}

private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {

    @Override
    public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
        log.debug("Received FindCoordinator response {}", resp);
        clearFindCoordinatorFuture();

        FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
        Errors error = findCoordinatorResponse.error();
        if (error == Errors.NONE) {
            synchronized (AbstractCoordinator.this) {
                // use MAX_VALUE - node.id as the coordinator id to allow separate connections
                // for the coordinator in the underlying network client layer
                int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();

                AbstractCoordinator.this.coordinator = new Node(
                        coordinatorConnectionId,
                        findCoordinatorResponse.node().host(),
                        findCoordinatorResponse.node().port());
                log.info("Discovered group coordinator {}", coordinator);
                client.tryConnect(coordinator);  // 连接coordinator
                heartbeat.resetTimeouts(time.milliseconds());   // 重置heartbeat
            }
            future.complete(null);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            ...
        } else {
            ...
        }
    }

    @Override
    public void onFailure(RuntimeException e, RequestFuture<Void> future) {
        ...
    }
}

然后阅读ensureCoordinatorReady(long startTimeMs, long timeoutMs)即可,大概逻辑就是:

  1. 只要还没与coordinator取得联系,ensureCoordinatorReady会循环调用lookupCoordinator,写出请求,并调用client.poll执行IO。
  2. lookupCoordinator选择一个broker调用sendFindCoordinatorRequest
  3. sendFindCoordinatorRequest向broker发送寻找coordinator的请求, 并制定响应由FindCoordinatorResponseHandler处理
  4. 收到响应后,FindCoordinatorResponseHandler进行处理,与coordinator建立连接

ensureActiveGroup

/**
* Ensure that the group is active (i.e. joined and synced)
*/      
public void ensureActiveGroup() {
    // always ensure that the coordinator is ready because we may have been disconnected
    // when sending heartbeats and does not necessarily require us to rejoin the group.
    ensureCoordinatorReady();
    startHeartbeatThreadIfNeeded();
    joinGroupIfNeeded();
}

主要逻辑在joinGroupIfNeeded


  1. 比如Kafka消费者组

  2. 读者需要理解RequestFuture的用法,否则会对各种addListener、compose、chain调用感到疑惑,而这些都是制定异步流程的方法。可阅读RequestFuture原理

相关文章

网友评论

      本文标题:KafkaConsumer 组件源码 ConsumerCoord

      本文链接:https://www.haomeiwen.com/subject/eqbafktx.html