美文网首页
Kafka之Producer端如何更新MetaData(二)

Kafka之Producer端如何更新MetaData(二)

作者: Roger1234 | 来源:发表于2019-07-12 15:19 被阅读0次

    前言

    在上篇文章中,我们简单介绍了Producer端的消息发送过程,其中的很多的细节并没有讲解到,我们阅读源码的时候会发现,在消息发送的过程中,有很多地方会请求metadata数据,本篇文章将主要讲解Producer端如何更新metadata。

    一、Cluster与MetaData数据结构

    MetaData封装了一个关于元数据的逻辑。这个类是客户端线程和后台的Sender线程共享,MetaData维护了一个包含了部分topic的集合,当我们请求topic对应的metadata而不可得时,就会触发metadata的更新。

    public class Metadata implements Closeable {
    
        private static final Logger log = LoggerFactory.getLogger(Metadata.class);
    
        public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
        private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
    
        private final long refreshBackoffMs;
        private final long metadataExpireMs;
        private int version;
        private long lastRefreshMs;
        private long lastSuccessfulRefreshMs;
        private AuthenticationException authenticationException;
        private MetadataCache cache = MetadataCache.empty();
        private boolean needUpdate;
        /* Topics with expiry time */
        private final Map<String, Long> topics;
        private final List<Listener> listeners;
        private final ClusterResourceListeners clusterResourceListeners;
        private boolean needMetadataForAllTopics;
        private final boolean allowAutoTopicCreation;
        private final boolean topicExpiryEnabled;
        private boolean isClosed;
        private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
    
    }
    

    Cluster包含了kafka集群中部分的结点、Topics和partitions信息。

    public final class Cluster {
    
        private final boolean isBootstrapConfigured;
        private final List<Node> nodes;
        private final Set<String> unauthorizedTopics;
        private final Set<String> invalidTopics;
        private final Set<String> internalTopics;
        private final Node controller;
        private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
        private final Map<String, List<PartitionInfo>> partitionsByTopic;
        private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
        private final Map<Integer, List<PartitionInfo>> partitionsByNode;
        private final Map<Integer, Node> nodesById;
        private final ClusterResource clusterResource;
    
    }
    

    二、MetaData的更新流程

    在producer发送消息的过程中,首先就会确保topicPartition对应的metadata是否存在,其逻辑主要封装在waitOnMetadata方法中。

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
            // add topic to metadata topic list if it is not there already and reset expiry
            Cluster cluster = metadata.fetch();
    
            if (cluster.invalidTopics().contains(topic))
                throw new InvalidTopicException(topic);
    
            metadata.add(topic);
    
            Integer partitionsCount = cluster.partitionCountForTopic(topic);
            // Return cached metadata if we have it, and if the record's partition is either undefined
            // or within the known partition range
            if (partitionsCount != null && (partition == null || partition < partitionsCount))
                return new ClusterAndWaitTime(cluster, 0);
    
            long begin = time.milliseconds();
            long remainingWaitMs = maxWaitMs;
            long elapsed;
            // Issue metadata requests until we have metadata for the topic and the requested partition,
            // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
            // is stale and the number of partitions for this topic has increased in the meantime.
            do {
                if (partition != null) {
                    log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
                } else {
                    log.trace("Requesting metadata update for topic {}.", topic);
                }
                metadata.add(topic);
                int version = metadata.requestUpdate();
                sender.wakeup();
                try {
                    metadata.awaitUpdate(version, remainingWaitMs);
                } catch (TimeoutException ex) {
                    // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                    throw new TimeoutException(
                            String.format("Topic %s not present in metadata after %d ms.",topic, maxWaitMs));
                }
                cluster = metadata.fetch();
                elapsed = time.milliseconds() - begin;
                if (elapsed >= maxWaitMs) {
                    throw new TimeoutException(partitionsCount == null ?
                            String.format("Topic %s not present in metadata after %d ms.",
                                    topic, maxWaitMs) :
                            String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                    partition, topic, partitionsCount, maxWaitMs));
                }
                if (cluster.unauthorizedTopics().contains(topic))
                    throw new TopicAuthorizationException(topic);
                if (cluster.invalidTopics().contains(topic))
                    throw new InvalidTopicException(topic);
                remainingWaitMs = maxWaitMs - elapsed;
                partitionsCount = cluster.partitionCountForTopic(topic);
            } while (partitionsCount == null || (partition != null && partition >= partitionsCount));
    
            return new ClusterAndWaitTime(cluster, elapsed);
        }
    

    (1)、首先如果cluster中的invalidTopics列表包含了要发送的topic,方法抛出异常。如果缓存的cluster包含了对应的topic的patritions信息,就直接返回。
    (2)、do while中的循环逻辑会不断的请求更新metadata数据,直到取到数据或者超时抛出异常。

    • metadata.requestUpdate()方法会把相关字段needUpdate置为true, 然后唤醒sender后台线程。
    • metadata.awaitUpdate(version, remainingWaitMs)方法等待更新结果,直到更新后版本号大与传进来的版本号,或者超时remainingWaitMs。 方法内部其实仍然是一个循环,跳出循环的条件就是版本号或者超时。

    我们发现线程会一直堵塞在一个嵌套的两层循环中,直到超时或者取到想要的结果。其实更新metadata操作,主要是通过sender.wakeup()来唤醒 sender线程,间接唤醒NetworkClient线程,NetworkClient线程来负责发送Metadata请求,并处理Server端的响应。

    2.1 NetworkClient中的poll()

    在整个Producer端的发送过程中,真正发送请求、处理返回结果的请求封装在NetworkClient类中的poll()方法中,send()函数的方法名字很具有迷惑性,官方wiki给出的注释:"Queue up the given request for sending",send()方法只是将相关信息保存在了InFlightRequests的发送队列中和KafkaChannel中,InFlightRequests保存发送的消息内容、KafkaChannel保存发送的元信息。真正的相关操作实现封装在poll方法中。

    public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();
    
        if (!abortedSends.isEmpty()) {
            // If there are aborted sends because of unsupported version exceptions or disconnects,
            // handle them immediately without waiting for Selector#poll.
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }
    
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
    
        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);
    
        return responses;
    }
    

    poll()方法主要包含了四个步骤:

    • 如果有因为版本不一致或者失去连接的发送请求,就直接处理掉。
    • metadataUpdater.maybeUpdate(now)方法判断现在是否需要更新metadata,如果需要的话,选择负载最小的一个结点,建立连接。maybeUpdate(now, node)主要是将更新metadata请求进入请求到发送队列中去。
    public long maybeUpdate(long now) {
        // should we update our metadata?
        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
        long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
    
        long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
    
        if (metadataTimeout > 0) {
            return metadataTimeout;
        }
    
        // Beware that the behavior of this method and the computation of timeouts for poll() are
        // highly dependent on the behavior of leastLoadedNode.
        Node node = leastLoadedNode(now);
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            return reconnectBackoffMs;
        }
        return maybeUpdate(now, node);
    
    }
    
    private long maybeUpdate(long now, Node node) {
                String nodeConnectionId = node.idString();
    
                if (canSendRequest(nodeConnectionId, now)) {
                    this.metadataFetchInProgress = true;
                    MetadataRequest.Builder metadataRequest;
                    if (metadata.needMetadataForAllTopics())
                        metadataRequest = MetadataRequest.Builder.allTopics();
                    else
                        metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
                                metadata.allowAutoTopicCreation());
    
    
                    log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                    sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                    return defaultRequestTimeoutMs;
                }
    
                // If there's any connection establishment underway, wait until it completes. This prevents
                // the client from unnecessarily connecting to additional nodes while a previous connection
                // attempt has not been completed.
                if (isAnyNodeConnecting()) {
                    // Strictly the timeout we should return here is "connect timeout", but as we don't
                    // have such application level configuration, using reconnect backoff instead.
                    return reconnectBackoffMs;
                }
    
                if (connectionStates.canConnect(nodeConnectionId, now)) {
                    // we don't have a connection to this node right now, make one
                    log.debug("Initialize connection to node {} for sending metadata request", node);
                    initiateConnect(node, now);
                    return reconnectBackoffMs;
                }
    
                // connected, but can't send more OR connecting
                // In either case, we just need to wait for a network event to let us know the selected
                // connection might be usable again.
                return Long.MAX_VALUE;
            }
        }
    

    在上一篇的文章中,我们直到每次请求更新metadata的时候,线程会阻塞在两层的循环中,直到超时或者成功更新。为了方法理解:我们考虑第一次发送数据的情景:
    (1)第一次唤醒send线程时,调用poll方法: 尝试与node结点建立连接。
    (2)第二次唤醒send线程时,调用poll方法: 发送metadata的更新请求。
    (3)第二次唤醒send线程时,调用poll方法: 处理metadata的更新结果。
    经过三次唤醒后,成功获取metadata数据,线程跳出循环继续下面的流程。

    • selector.poll()方法执行真正的I/O操作,包括发送请求、接受结果。该方法底层依赖了JAVA原生的NIO方法。
    • 处理请求后的结果,包括metadata返回的结果和数据请求的结果。在此我们之分析metadata的响应结果,逻辑主要封装在handleCompletedMetadataResponse()方法中:
       public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
                this.metadataFetchInProgress = false;
    
                // If any partition has leader with missing listeners, log a few for diagnosing broker configuration
                // issues. This could be a transient issue if listeners were added dynamically to brokers.
                List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
                    topicMetadata.partitionMetadata().stream()
                        .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
                        .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
                    .collect(Collectors.toList());
                if (!missingListenerPartitions.isEmpty()) {
                    int count = missingListenerPartitions.size();
                    log.warn("{} partitions have leader brokers without a matching listener, including {}",
                            count, missingListenerPartitions.subList(0, Math.min(10, count)));
                }
    
                // check if any topics metadata failed to get updated
                Map<String, Errors> errors = response.errors();
                if (!errors.isEmpty())
                    log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
    
                // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
                // created which means we will get errors and no nodes until it exists
                if (response.brokers().isEmpty()) {
                    log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                    this.metadata.failedUpdate(now, null);
                } else {
                    this.metadata.update(response, now);
                }
            }
    

    2.2 Metadata策略更新形式

    通过总结发现,meta的更新主要有两种形式:

    • 强制更新:调用metadata.requestUpdate()强制更新,requestUpdate()函数里面其实什么都没做,就是把needUpdate置成了true。强制更新主要存在如下更几种情况:
      • 初始化连接,initConnect的时候
      • 处理超时请求,handleTimedOutRequests
      • 处理失连,handleDisconnections
      • topic对应的leader不存在的时候
      • 强制关闭Sender线程,forceClose时候
      • prodcuer端对应的prodcerId不存在,maybeWaitForProducerId的时候
      • 由于InvalidMetadata曝出异常InvalidMetadataException
    • 周期更新:根据lastSuccessfulRefreshMs、metadataExpireMs、lastRefreshMs、refreshBackoffMs等字段、周期性的更新metadata数据。

    三、总结

    本篇文章主要讲解了producer端如何更新metadata,只是介绍了主要的更新流程,还有很多的细节,目前我也没有搞懂,之后如果有新的理解的话,我会及时更新在文章中。下一篇将讲解为了保障partition消息的时序性,producer端做了哪些工作。

    相关文章

      网友评论

          本文标题:Kafka之Producer端如何更新MetaData(二)

          本文链接:https://www.haomeiwen.com/subject/ybzekctx.html