美文网首页
5.Kafka源码深入解析之拉取元数据02

5.Kafka源码深入解析之拉取元数据02

作者: hoose | 来源:发表于2021-08-24 16:09 被阅读0次

    上一节我们详细解析了生产端在发消息的dosend()里,首先是拉取元数据信息,在拉取过程中,先唤醒了sender线程,接着主线程进入等待状态,直到sender线程拉取元数据成功返回,本章节来详细 分析一下sender线程是如何建立通信,从broker端拉取元数据的。

    我们知道sender线程是拉取元数据用的,那么当唤醒它之后,必然就会调用它的run(),来看下源码:

     public void run() {
            log.debug("Starting Kafka producer I/O thread.");
    
            // main loop, runs until close is called
            while (running) {
                try {
                    run(time.milliseconds());
                } catch (Exception e) {
                    log.error("Uncaught error in kafka producer I/O thread: ", e);
                }
            }
    ...
    

    可以看到在while循环里调用了run(time.milliseconds());

        void run(long now) {
            if (transactionManager != null) {
               //...这里主要是对事务manager中的一些判断,对于我们来说目前先略过,后面讲到事务在分析
            }
    
            //准备发送的数据请求,如果此时没有元数据,那么下面的方面基本不会执行
            long pollTimeout = sendProducerData(now);
            //把准备好的消息请求真正的发送出去
            client.poll(pollTimeout, now);
        }
    

    上面的代码可以看到long pollTimeout = sendProducerData(now);这个其实是发消息的一些处理,在目前还没有获取元数据的情况下,其实是没有做什么事的,重点我们看client.poll,这个是真正的发送请求出去的。这个请求包括消息数据本身,还有就是元数据请求,点击进去此方法,可以看到这个client实际上是 NetworkClient,哈哈,还记的在前2章节我们分析KafkaProducer初始化的时候,是不是同时初始化了这非常重要的组件。

    public List<ClientResponse> poll(long timeout, long now) {
            if (!abortedSends.isEmpty()) {
                // If there are aborted sends because of unsupported version exceptions or disconnects,
                // handle them immediately without waiting for Selector#poll.
                List<ClientResponse> responses = new ArrayList<>();
                handleAbortedSends(responses);
                completeResponses(responses);
                return responses;
            }
    
            //封装一个要拉取元数据的请求
            //目前还没有把请求发送出去,客户端缓存了这些请求
            long metadataTimeout = metadataUpdater.maybeUpdate(now);
            try {
                /**
                 * 在Poll方法里,会去跟目标节点broker
                 */
                //发送请求,进行复杂的网络操作
                //TODO 通过网络,把发送查询元数据请求发送出去
                this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
            } catch (IOException e) {
                log.error("Unexpected error during I/O", e);
            }
    
            // process completed actions
            long updatedNow = this.time.milliseconds();
            List<ClientResponse> responses = new ArrayList<>();
            //处理发送完的Send对象
            handleCompletedSends(responses, updatedNow);
            /**
             * TODO 处理响应:响应里面就会有我们需要的元数据
             *  这个地方是我们在看生产者是如何获取元数据的时候看的
             *  其实kafka 获取元数据的流程和我们发送消息的流程是一样的:
             *   获取元数据 -> 判断网络连接是否建立好 -> 建立网络连接 ->发送请求(获取元数据的请求) ->服务端发送回来
             */
            //处理接受到的响应responses
            handleCompletedReceives(responses, updatedNow);
            //处理关闭的响应
            handleDisconnections(responses, updatedNow);
            //处理连接状态的响应
            handleConnections();
            handleInitiateApiVersionRequests(updatedNow);
            handleTimedOutRequests(responses, updatedNow);
            completeResponses(responses);
    
            return responses;
        }
    

    源码里注解写的很详细,long metadataTimeout = metadataUpdater.maybeUpdate(now);这个封装元数据请求,this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));真正的把请求发送出去到broker了,当然这里我们先不去分析broker端是如何接收请求以及查询数据并返回的,后面分析到broker端在说。
    handleCompletedReceives(responses, updatedNow);这方法明显可以看到,是处理接受到响应的response的,点击进去

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
            for (NetworkReceive receive : this.selector.completedReceives()) {
                //获取响应的节点Id
                String source = receive.source();
                //从inFlightRequests中获取缓存的request请求
                InFlightRequest req = inFlightRequests.completeNext(source);
                //解析响应信息,验证响应头,生成Struct对象
                Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                    throttleTimeSensor, now);
                if (log.isTraceEnabled()) {
                    log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                        req.header.apiKey(), req.header.correlationId(), responseStruct);
                }
                AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
                //如果是元数据请求的响应
                if (req.isInternalRequest && body instanceof MetadataResponse)
                    //处理metadata的更新信息
                    metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
                //如果是是版本协调的响应
                else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                    handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
                else
                    //其他响应,就添加响应到本地的响应队列里
                    responses.add(req.completed(body, now));
            }
        }
    

    在上面的源码我们可以看到这行代码判断:
    if (req.isInternalRequest && body instanceof MetadataResponse) //处理metadata的更新信息 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
    这个不是就是判断此response是不是返回的元数据请求吗,那么come on,点击
    metadataUpdater.handleCompletedMetadataResponse去看一下它的实现

     public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
                this.metadataFetchInProgress = false;
                Cluster cluster = response.cluster();
                // check if any topics metadata failed to get updated
                Map<String, Errors> errors = response.errors();
                if (!errors.isEmpty())
                    log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
    
                // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
                // created which means we will get errors and no nodes until it exists
    
                //如果元数据信息存在
                if (cluster.nodes().size() > 0) {
                    //有了topic的元数据,就更新缓存本地的元数据
                    this.metadata.update(cluster, response.unavailableTopics(), now);
                } else {
                    log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                    this.metadata.failedUpdate(now, null);
                }
            }
    

    这里看明白了吧,Cluster cluster = response.cluster();这个返回的数据赋值给cluster,下面判断if (cluster.nodes().size() > 0),如果真的拉取到了元数据,那么就执行:

    this.metadata.update(cluster, response.unavailableTopics(), now);
    

    这个方法是不是很熟悉,这不就是我们当时在初始化KafkaProducer的时候调用的吗,当时说调用这个方法的时候,其实并没有真正的拉取元数据,只是把我们配置的properties里的address值封装到了Cluster对象了,现在我们在这里可以看到真正的去更新了元数据信息,同时我们还记的this.metadata.update方法里的前面讲的一个唤醒动作吗:

    //这里最最重要的作业就是唤醒上一讲中处于watit的线程,
            //唤醒这个线程:metadata.awaitUpdate(version,remainingWaitms)
            notifyAll();
    log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
    

    这里唤醒谁了?还记的上一节我们讲的在doSend()方法里,两个重要的操作,一是唤醒sender线程,二是主线程在metadata.awaitUpdate()方法进入等待状态。
    这里notifyAll()就是唤醒了主线程!
    到这里我们拉取元数据的操作就算全部解析完成了,下一节我们开始进入真正的发消息阶段。

    相关文章

      网友评论

          本文标题:5.Kafka源码深入解析之拉取元数据02

          本文链接:https://www.haomeiwen.com/subject/yupdiltx.html