美文网首页
Kafka源码分析-Producer(6)-Sender分析(3

Kafka源码分析-Producer(6)-Sender分析(3

作者: 陈阳001 | 来源:发表于2018-10-03 23:18 被阅读0次

一.MetadataUpdater

简介

MetadataUpdater接口是一个辅助NetworkClient更新的Metadata接口,有两个实现类:


image.png

DefaultMetadataUpdater是NetworkClient使用的默认实现,下面是它的三个字段:

  • metadata:指向集群元数据的Metadata对象。
  • metadataFetchInProgress:用来标识是否已经发送了MetadataRequest请求更新Metadata,如果已经发送,就不会重复发送。
  • lastNoNodeAvailableMs:监测到没有可用节点时,用这个字段记录时间戳。

maybeUpdate()方法:

maybeUpdate()方法是DefaultMetadataUpdater的核心方法,用来判断当前的Metadata中保存的集群元数据是否需要更新。首先会监测metadataFetchInProgress字段,如果没有发送,满足下面一个条件就能够更新:

  • Metadata.needUpdate字段被设置为true,且避让时间已到。
  • 长时间没有更新,默认5分钟一次。

MetadataRequest

如果需要更新,则发送MetadataRequest请求,MetadataRequest请求的请求头包含ApiKeys.MetaData标识,消息体中包含Topic集合表示需要获取元数据的Topic,如果Topic集合为Null则表示全部Topic的元数据。

MetadataResponse

名称 类型 含义
node_id int Node节点的Id
host String Node节点的Host名称
rack String 每个Broker的机架信息
controller_id int controller所在的Node节点的Id
topic_error_code short 错误码
topic String topic名称
is_internal boolean 是否为Kafka内部的topic
partition_error_code short 错误码
partition_id int 分区编号
leader int 分区的Leader Replica所在的Id
replicas int集合 此分区所有Replica所在的Node节点的Id的集合
isr int集合 此分区的ISR所在的Node节点的Id的集合

MetadataRequest请求发送之前,要将metadataFetchInProgress置为true,然后从所有Node中选择负载最小的Node节点,向其发送更新请求。负载大小通过每个Node在InFlightRequests队列中未确认的请求决定的,未确认的请求越多则认为负载越大。
发送过程:将请求添加到InFlightRequests队列中,然后设置到KafkaChannel的send字段中,通过KSelector.poll()方法将MetadataRequest请求发送出去。DefaultMetadataUpdater.maybeUpdate()方法的代码:

 public long maybeUpdate(long now) {
            /*
            调用metadata.timeToNextUpdate(now)方法,其中会检测needUpdate的值,退避时间,是否长时间未更新。
            最终得到一个下次更新集群元数据的实际戳。
             */
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            //获取下次尝试重新连接服务器端的时间戳。
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            //检测是否已经发送了 MetadataRequest 请求。
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            //计算当前距离下次可以发送MetadataRequest请求的时间差。
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);

            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                //找到负载最小的 node,若没有可以用的node,则返回null。
                Node node = leastLoadedNode(now);
                //创建并缓存MetadataRequest,等待下次poll()方法才会真正发送。
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }


/**
         * Add a metadata request to the list of sends if we can make one
         */
        private void maybeUpdate(long now, Node node) {
            if (node == null) {//是否有node可用
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;//标记没有node可以连接的时间戳
                return;
            }
            String nodeConnectionId = node.idString();
            //检测是否允许向此Node发送请求。
            if (canSendRequest(nodeConnectionId)) {
                this.metadataFetchInProgress = true;
                MetadataRequest metadataRequest;
                if (metadata.needMetadataForAllTopics())//指定需要更新元数据的Topic
                    metadataRequest = MetadataRequest.allTopics();
                else
                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
                //将MetadataRequest封装成ClientRequest
                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                doSend(clientRequest, now);//缓存请求,下次poll()操作会将其发送出去
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                initiateConnect(node, now);//初始化连接
                // If initiateConnect failed immediately, this node will be put into blackout and we
                // should allow immediately retrying in case there is another candidate node. If it
                // is still connecting, the worst case is that we end up setting a longer timeout
                // on the next round and then wait for the response.
            } else { 
                // connected, but can't send more OR connecting
                // In either case, we just need to wait for a network event to let us know the selected
                // connection might be usable again.
                //已成功连接到指定的节点,但不能发送请求,则更新lastNoNodeAvailableMs后等待
                this.lastNoNodeAvailableMs = now;
            }
        }

在收到MetadataResponse之后,会先调用MetaUpdater.maybeHandleCompletedReceive()方法检测是否为MetadataResponse,如果是,就调用handleResponse()解析响应,并构造Cluster对象更新Metadata.cluster字段。
因为cluster是不可变字段,更新集群元数据的方式是:
创建新的Cluster对象,并覆盖Metadata.cluster字段,代码如下:
NetworkClient类里的方法:

@Override
        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
            short apiKey = req.request().header().apiKey();
            //检测是否为MetadataRequest请求。
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                handleResponse(req.request().header(), body, now);
                return true;
            }
            return false;
        }


private void handleResponse(RequestHeader header, Struct body, long now) {
            this.metadataFetchInProgress = false;//收到 MetadataResponse 了,于是修改metadataFetchInProgress=false。
            //解析MetadataResponse
            MetadataResponse response = new MetadataResponse(body);
            //创建Cluster对象
            Cluster cluster = response.cluster();
            // check if any topics metadata failed to get updated。检测 MetadataResponse 里的错误码。
            Map<String, Errors> errors = response.errors();
            if (!errors.isEmpty())
                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);

            // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
            // created which means we will get errors and no nodes until it exists
            if (cluster.nodes().size() > 0) {

                this.metadata.update(cluster, now);
            } else {
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                this.metadata.failedUpdate(now);
            }
        }

Metadata类里的方法:

    /**
     * Update the cluster metadata
     */
    public synchronized void update(Cluster cluster, long now) {
        this.needUpdate = false;
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        this.version += 1;
        //1.通知Metadata上的监听器。
        for (Listener listener: listeners)
            listener.onMetadataUpdate(cluster);
        //更新cluster字段。
        // Do this after notifying listeners as subscribed topics' list can be changed by listeners
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

        notifyAll();
        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
    }

当连接断开或其他的异常导致无法获得响应时,由maybeHandleDisconnetion()方法处理,他会将metadataFetchInProgress字段置为false,这样就能顺利的发送下一次更新Metadata请求了。

@Override
        public boolean maybeHandleDisconnection(ClientRequest request) {
            ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());

            if (requestKey == ApiKeys.METADATA) {//是否为MetadataRequest请求
                Cluster cluster = metadata.fetch();
                if (cluster.isBootstrapConfigured()) {
                    int nodeId = Integer.parseInt(request.request().destination());
                    Node node = cluster.nodeById(nodeId);
                    if (node != null)
                        log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
                }

                metadataFetchInProgress = false;//更新metadataFetchInProgress = false
                return true;
            }

            return false;
        }

相关文章

网友评论

      本文标题:Kafka源码分析-Producer(6)-Sender分析(3

      本文链接:https://www.haomeiwen.com/subject/omvtaftx.html