美文网首页
[Kafka] KafkaProducer Metadata的更

[Kafka] KafkaProducer Metadata的更

作者: LZhan | 来源:发表于2020-01-03 10:50 被阅读0次

1 前言

在查看KafkaProducer生产消息的源码过程中,经常能看到对元数据进行更新的请求,粗糙地看了下,Producer Metadata的更新应该分为两种情况:强制更新和定时更新。

那么今天就来看一下这其中的更新机制到底是怎样的呢!

2 源码剖析

2.1 Metadata内容

Metadata类注释说明:

该类封装了关于围绕元数据的逻辑。这个类被client线程和后台sender线程所共享,它只是保存了所有topic的一部分,当我们请求了一个其中没有的topic相关元数据时,它会通过发送metadata update来更新元数据信息。如果topic的元数据信息过期策略是被允许的,那么任何topic过期的话会被从集合中移除。但是consumer是不允许topic过期的,因为它明确地知道它需要管理哪些topic。

public class Metadata implements Closeable {

    private static final Logger log = LoggerFactory.getLogger(Metadata.class);
    // Topic过期时间
    public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
    private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;

    // metadata 更新失败时,为了避免频繁更新meta,最小的时间间隔,默认为100ms
    private final long refreshBackoffMs;
    // metadata的过期时间,默认为60,000ms
    private final long metadataExpireMs;
    // metadata更新版本,每更新1次,version自增1,主要用于判断metadata是否更新
    private int updateVersion;  // bumped on every metadata response
    private int requestVersion; // bumped on every new topic addition
    // 最近一次更新时的时间(包含更新失败的)
    private long lastRefreshMs;
    // 最后一次成功更新的时间(如果每次都成功的话,与lastRefreshMs一致)
    private long lastSuccessfulRefreshMs;
    private AuthenticationException authenticationException;
    // 元数据信息的Cache缓存
    private MetadataCache cache = MetadataCache.empty();
    // 是否需要更新的标识,每次更新前需要将该属性置为true
    private boolean needUpdate;
    /* Topics with expiry time */
    // topic与其过期时间的对应关系
    private final Map<String, Long> topics;
    // 事件监控者
    private final List<Listener> listeners;
    private final ClusterResourceListeners clusterResourceListeners;
    // 是否强制更新所有的metadata
    private boolean needMetadataForAllTopics;
    private final boolean allowAutoTopicCreation;
    // 默认为true,Producer会定时移除过期的 topic,consumer 则不会移除
    private final boolean topicExpiryEnabled;
    private boolean isClosed;
    private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
}

MetadataCache:Kafka集群中关于node,topic和partition的信息。(是只读的)

class MetadataCache {
    private final String clusterId;
    private final List<Node> nodes;
    private final Set<String> unauthorizedTopics;
    private final Set<String> invalidTopics;
    private final Set<String> internalTopics;
    private final Node controller;
    private final Map<TopicPartition, PartitionInfoAndEpoch> metadataByPartition;

    private Cluster clusterInstance;
} 

Cluster:保存Topic的详细信息(leader所在节点,replica所在节点,isr列表)

public final class Cluster {
    // 从命名直接就看出了各个变量的用途
    private final boolean isBootstrapConfigured;
    private final List<Node> nodes; // node 列表
    private final Set<String> unauthorizedTopics; // 未认证的 topic 列表
    private final Set<String> internalTopics; // 内置的 topic 列表
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; // partition 的详细信息
    private final Map<String, List<PartitionInfo>> partitionsByTopic; // topic 与 partition 的对应关系
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; //  可用(leader 不为 null)的 topic 与 partition 的对应关系
    private final Map<Integer, List<PartitionInfo>> partitionsByNode; // node 与 partition 的对应关系
    private final Map<Integer, Node> nodesById; // node 与 broker id 的对应关系
    private final ClusterResource clusterResource;
}

// 包含topic,partition,partition的主副本节点,replicas,isr,osr
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
}

2.2 Metadata更新流程

2.2.1 获取topic的metadata信息

producer在调用doSend()方法,第一步就是通过waitOnMetadata方法获取该topic的metadata信息。

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        // 获取集群元数据信息
        Cluster cluster = metadata.fetch();

        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        //在metadata中添加topic后,如果metadata中没有这个topic的meta,那么metadata的更新标志needUpdate属性设置为了true
        metadata.add(topic);

        //获取当前这个topic在集群中的分区信息
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        // 如果当前topic的相关分区信息在metadata里已经有了直接返回元数据(partition为null是因为生产消息时,消息可以不指定分区)
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        // Issue metadata requests until we have metadata for the topic and the requested partition,
        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        // 如果metadata没有更新成功,方法一直在循环中
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic);
            //将needUpdate设置为true,表示元数据需要更新
            int version = metadata.requestUpdate();
            //唤醒sender线程
            sender.wakeup();
            try {
                //等待sender线程去更新元数据信息,更新成功之前一直阻塞主线程
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            //超时
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            //认证失败,对当前topic没有write权限
            if (cluster.unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            if (cluster.invalidTopics().contains(topic))
                throw new InvalidTopicException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
            //不停循环,直到 partitionsCount 不为 null(即直到 metadata 中已经包含了这个 topic 的相关信息)
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount)); 

        return new ClusterAndWaitTime(cluster, elapsed);
    }

如果 metadata 中不存在这个 topic 的 metadata,那么就请求更新 metadata,如果 metadata 没有更新的话,方法就一直处在 do ... while 的循环之中,在循环之中,主要做以下操作:

(1)metadata.requestUpdate()
将metadata的needUpdate属性设置为true(强制更新),并且返回当前的版本号(version),通过版本号来判断metadata是否完成更新。

(2)sender.wakeup()
唤醒sender线程,sender线程又会去唤醒NetworkClient线程,NetworkClient线程进行一些实际的操作

(3)metadata.awaitUpdate(version, remainingWaitMs);
等待metadata的更新

    // 更新metadata信息(根据当前version值来判断)
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs < 0)
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");

        long begin = System.currentTimeMillis();
        long remainingWaitMs = maxWaitMs;
        //不断循环,知道metadata更新成功,version自增
        //通过比较版本号的方式,控制数据一致性
        while ((this.updateVersion <= lastVersion) && !isClosed()) {
            AuthenticationException ex = getAndClearAuthenticationException();
            if (ex != null)
                throw ex;
            if (remainingWaitMs != 0)
                wait(remainingWaitMs); //阻塞线程,等待metadata的更新,超时的话,就会唤醒线程了
            long elapsed = System.currentTimeMillis() - begin;
            //如果没更新成功的话,elapsed=maxWaitMs
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            remainingWaitMs = maxWaitMs - elapsed;
        }
        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }

2.2.2 什么时候需要更新metadata

那么,什么时候,producer metadata是需要更新的呢?

    public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        if (!abortedSends.isEmpty()) {
            // If there are aborted sends because of unsupported version exceptions or disconnects,
            // handle them immediately without waiting for Selector#poll.
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }

        // 判断是否需要更新metadata,如果需要就更新(请求更新metadata的地方)
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);   //通过selector中获取Server端的response
        handleCompletedReceives(responses, updatedNow); //在返回handler中,处理metadata的更新
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
    }
  • metadataUpdater.maybeUpdate(now):判断是否需要更新Metadata,如果需要更新的话,先与Broker建立连接,然后发送更新metadata的请求。

  • 处理 Server 端的一些响应,这里主要讨论的是handleCompletedReceives(responses, updatedNow) 方法,它会处理 Server 端返回的 metadata 结果。


DefaultMetadataUpdater的maybeUpdate方法

 public long maybeUpdate(long now) {
     // should we update our metadata?
     // metadata下次更新的时间(需要判断是强制更新还是metadata过期更新,前者是立马更新,后者是计算)
     long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
     // 如果一条metadata的fetch请求还没有从server端收到回复,那么设置等待时间为waitForMetadataFetch(默认是30s)
     long waitForMetadataFetch = this.hasFetchInProgress() ? defaultRequestTimeoutMs : 0;

     long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);

     if (metadataTimeout > 0) { //时间未到时,直接返回下次应该更新的时间
         return metadataTimeout;
     }

     // Beware that the behavior of this method and the computation of timeouts for poll() are
     // highly dependent on the behavior of leastLoadedNode.
     Node node = leastLoadedNode(now); //选择一个请求连接数最小的节点
     if (node == null) {
         log.debug("Give up sending metadata request since no node is available");
         return reconnectBackoffMs;
     }

     return maybeUpdate(now, node); //可以发送请求的话,就发送metadata请求
 }

计算metadata下次更新的时间,timeToNextUpdate

    public synchronized long timeToNextUpdate(long nowMs) {
        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
        return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
    }
    
        public synchronized long timeToAllowUpdate(long nowMs) {
        // lastRefreshMs:最近更新时间(包括失败)  refreshBackoffMs:间隔时长
        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
    }

下次更新元数据信息的时间:当前metadata信息即将到期的时间 即 timeToExpire距离允许更新metadata信息的时间 即 timeToAllowUpdate 中的最大值

timeToExpire:needUpdate为true,表示强制更新,此时该值为0;否则的话,就按照定时更新时间,即元数据信息过期时间(默认是60000ms即1分钟)进行周期性更新

timeToAllowUpdate:默认就是refreshBackoffMs的默认值,即100ms

接着调用私有的maybeUpdata方法

       // 判断是否可以发送请求,可以的话就将metadata请求加入到发送列表中
       private long maybeUpdate(long now, Node node) {
           String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId, now)) { //通道已经ready,并且支持发送更多的请求
                // 更新所有topic还是metadata中的topic
                Metadata.MetadataRequestAndVersion metadataRequestAndVersion = metadata.newMetadataRequestAndVersion();
                inProgressRequestVersion = metadataRequestAndVersion.requestVersion;
                // 创建metadata请求
                MetadataRequest.Builder metadataRequest = metadataRequestAndVersion.requestBuilder;
                log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                // 发送metadata请求
                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                return defaultRequestTimeoutMs;
            }

            // If there's any connection establishment underway, wait until it completes. This prevents
            // the client from unnecessarily connecting to additional nodes while a previous connection
            // attempt has not been completed.
            if (isAnyNodeConnecting()) { // 如果client正在与任何一个node的连接状态是connecting,那么就进行等待
                // Strictly the timeout we should return here is "connect timeout", but as we don't
                // have such application level configuration, using reconnect backoff instead.
                return reconnectBackoffMs;
            }
            // 如果没有连接这个node,就初始化链接
            if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node);
                // 初始化连接
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

            // connected, but can't send more OR connecting
            // In either case, we just need to wait for a network event to let us know the selected
            // connection might be usable again.
            return Long.MAX_VALUE;
        }



    public synchronized MetadataRequestAndVersion newMetadataRequestAndVersion() {
        final MetadataRequest.Builder metadataRequestBuilder;
        // 强制更新所有Topic的metadata(虽然默认不会更新所有topic的metadata信息,但是每个broker会保存所有topic的meta信息)
        if (needMetadataForAllTopics)
            metadataRequestBuilder = MetadataRequest.Builder.allTopics();
        else
            // 只会更新metadata中的topics列表(列表中的topics是由metadata.add()得到)
            metadataRequestBuilder = new MetadataRequest.Builder(new ArrayList<>(this.topics.keySet()),
                    allowAutoTopicCreation());
        return new MetadataRequestAndVersion(metadataRequestBuilder, requestVersion);
    }

因此,每次producer请求更新metadata时,会有以下几种情况:

(1)通道已经ready,node可以发送请求,那么就直接发送请求

(2)如果该node正在建立连接,则直接返回

(3)如果该node还没建立连接,则向broker初始化连接

而KafkaProducer线程一直是阻塞在两个while循环中的,直到metadata更新

(1)sender线程第一次调用poll,初始化与node的连接

(2)sender线程第二次调用poll,发送Metadata请求

(3)sender线程第三次调用poll,获取 metadataResponse,并更新 metadata

2.2.3 接收Server端的响应,更新Metadata信息

handleCompletedReceives是如何处理任何已完成的接收响应

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            InFlightRequest req = inFlightRequests.completeNext(source);
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
            if (log.isTraceEnabled()) {
                log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                    req.header.apiKey(), req.header.correlationId(), responseStruct);
            }
            // If the received response includes a throttle delay, throttle the connection.
            AbstractResponse body = AbstractResponse.
                    parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
            maybeThrottle(body, req.header.apiVersion(), req.destination, now);
            // 这里有判断类型,如果是Metadata请求的响应
            if (req.isInternalRequest && body instanceof MetadataResponse)
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
                responses.add(req.completed(body, now));
        }
    }

之后进一步调用handleCompletedMetadataResponse

3 总结

Metadata有两种更新方式:

  1. 强制更新:调用Metadata.requestUpdate()needUpdate置为true来强制更新
  2. 周期性更新:通过Metadata的lastSuccessfulRefreshMsmetadataExpireMs来实现,一般情况下,默认周期时间就是metadataExpireMs,1分钟时长

在NetworkClient的poll()方法调用时,会去检查两种更新机制,只要达到一种,就会触发更新操作。

Metadata 的强制更新会在以下几种情况下进行:

  1. initConnect 方法调用时,初始化连接;
  2. poll() 方法中对 handleDisconnections() 方法调用来处理连接断开的情况,这时会触发强制更新;
  3. poll() 方法中对 handleTimedOutRequests() 来处理请求超时时;
  4. 发送消息时,如果无法找到 partition 的 leader;
  5. 处理 Producer 响应(handleProduceResponse),如果返回关于 Metadata 过期的异常,比如:没有 topic-partition 的相关 meta 或者 client 没有权限获取其 metadata。

强制更新主要是用于处理各种异常情况。

4 参考文档

相关文章

网友评论

      本文标题:[Kafka] KafkaProducer Metadata的更

      本文链接:https://www.haomeiwen.com/subject/qmhzoctx.html