美文网首页
Kafka源码分析-Producer(7)-Sender分析(4

Kafka源码分析-Producer(7)-Sender分析(4

作者: 陈阳001 | 来源:发表于2018-10-04 22:15 被阅读0次

    一.NetworkClient

    NetworkClient中所有的连接由ClusterConnectionStates管理,它底层使用Map<String,NodeConnectionState>实现,key是NodeId, value是NodeConnectionState对象,ConnectionState枚举表示连接状态,还记录最近一次尝试连接的时间戳。
    NetClient依赖的组件都介绍完了,现在集中研究下NetworkClient的实现。NetworkClient是个通用的网络客户端的实现,不止用于生产者发消息,也用于消费者消费消息以及服务端Broke之间的通信。
    下面介绍NetworkClient的核心方法:

    NetworkClient.ready()及相关方法

    此方法用来检查Node是否准备好接受数据。首先通过NetworkClient.isReady()方法检测是否能向一个Node发送请求,需要符合以下三个条件,表示Node已经准备好:

    • Metadata并未处于正在更新或需要更新的状态。
    • 已经成功建立连接并连接正常connectionStates.isConnected(node)
    • InFlightRequests.canSendMore()返回true。
      如果NetworkClient.isReady()返回false,且满足下面两个条件,就会调用initiateConnect()方法重新连接。
    • 连接不能是CONNECTEDING状态,必须是DISCONNECTED。
    • 为了避免网络拥塞,重连不能太频繁,两次重试之间的时间差必须大于重试的退避时间,由reconnectBackoffMs指定。
      NetworkClient.initiateConnect()方法会修改在ClusterConnectionStates中的连接状态,并调用Selector.connect()方法发起连接,之后调用Selector.pollSelectionKeys()方法时,判断连接是否建立。如果建立成功,则会将ConnectionState设置为CONNECTED。

    NetworkClient.send()及相关方法

    将请求设置到KafkaChannel.send字段,同时将请求添加到InFlightRequests队列中等待响应。

    /**
         * Queue up the given request for sending. Requests can only be sent out to ready nodes.
         *
         * @param request The request
         * @param now The current timestamp
         */
        @Override
        public void send(ClientRequest request, long now) {
            String nodeId = request.request().destination();
            if (!canSendRequest(nodeId))//检测是否能够向指定Node发送请求
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
            //设置KafkaChannel.send字段,并将请求放入InFlightRequests等待响应。
            doSend(request, now);
        }
    
    /**
         * Are we connected and ready and able to send more requests to the given connection?
         *
         * @param node The node
         */
        private boolean canSendRequest(String node) {
            return connectionStates.isConnected(node) //检测连接状态
                    && selector.isChannelReady(node) //检测网络协议正常且是否通过了身份认证。
                    && inFlightRequests.canSendMore(node);//是否能向inFlightRequests发送信息
        }
    

    NetworkClient.poll()

    调用KSelector.poll()进行网络I/O(参考KSelector小节的描述),并使用handle*()方法对KSelector.poll()产生的各种数据和队列进行处理。

    /**
         * Do actual reads and writes to sockets.
         *
         * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
         *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
         *                metadata timeout
         * @param now The current time in milliseconds
         * @return The list of responses received
         */
        @Override
        public List<ClientResponse> poll(long timeout, long now) {
            //更新Metadata
            long metadataTimeout = metadataUpdater.maybeUpdate(now);
            try {
                this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));//调用KSelector执行I/O操作
            } catch (IOException e) {
                log.error("Unexpected error during I/O", e);
            }
    
            // process completed actions
            long updatedNow = this.time.milliseconds();
            List<ClientResponse> responses = new ArrayList<>();//响应队列
            handleCompletedSends(responses, updatedNow);//处理completedSends队列
            handleCompletedReceives(responses, updatedNow);//处理completedReceives队列
            handleDisconnections(responses, updatedNow);//处理disconnected列表
            handleConnections();//处理connected列表
            //处理InFlightRequests中的超时请求
            handleTimedOutRequests(responses, updatedNow);
    
            // invoke callbacks
            //循环调用ClientRequest的回调函数
            for (ClientResponse response : responses) {
                if (response.request().hasCallback()) {
                    try {
                        response.request().callback().onComplete(response);
                    } catch (Exception e) {
                        log.error("Uncaught error in request completion:", e);
                    }
                }
            }
            return responses;
        }
    

    handle*()方法

    handleCompletedSends()

    InFlightRequest保存的是已经发送但是没收到响应的请求,completedSends保存的是最近一次poll()方法中发送成功的请求,所以completedSends列表和InFlightRequests中对应队列的最后一个请求是一致的:
    handleCompletedSends()方法会遍历completedSends,如果发现不需要响应的请求,则将其从InFlightRequests中删除,并向responses列表中添加对应的ClientResponse,并且ClientResponse中包含一个指向ClientRequest的引用。代码如下:


    await.png
    /**
         * Handle any completed request send. In particular if no response is expected consider the request complete.
         *
         * @param responses The list of responses to update
         * @param now The current time
         */
        private void handleCompletedSends(List<ClientResponse> responses, long now) {
            // if no response is expected then when the send is completed, return it
            //遍历completedSends集合
            for (Send send : this.selector.completedSends()) {
                ClientRequest request = this.inFlightRequests.lastSent(send.destination());//获取指定队列的第一个元素
                if (!request.expectResponse()) {//检测请求是否需要响应
                    //将inFlightRequests中对应队列中的第一个请求删除
                    this.inFlightRequests.completeLastSent(send.destination());
                    //生成ClientResponse对象,添加到responses集合
                    responses.add(new ClientResponse(request, now, false, null));
                }
            }
        }
    

    handleCompletedReceives()

    遍历completedReceives队列,并在InFlightRequests中删除对应的ClientRequest,并向responses列表中添加对应的ClientResponse。如果是Metadata更新请求的响应,则会调用MetadataUpdater中的maybeHandleCompletedReceive()方法,更新Metadata中记录的Kafka集群元数据。

    /**
         * Handle any completed receives and update the response list with the responses received.
         *
         * @param responses The list of responses to update
         * @param now The current time
         */
        private void handleCompletedReceives(List<ClientResponse> responses, long now) {
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String source = receive.source();//返回响应的NodeId
                //从inFlightRequests中取出对应的ClientRequest
                ClientRequest req = inFlightRequests.completeNext(source);
                //解析响应
                Struct body = parseResponse(receive.payload(), req.request().header());
                //调用MetadataUpdater.maybeHandleCompletedReceive()方法处理
                // MetadataResponse。其中会更新Metadata中记录的集群元数据,并唤醒所有
                //等待Metadata更新完成的线程。
                if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                    //如果不是MetadataResponse,则创建ClientResponse并添加到response集合里
                    responses.add(new ClientResponse(req, now, false, body));
            }
        }
    

    handleDisconnections()方法

    遍历disconnected列表,将InFlightRequests对应节点的ClientRequest清空,对每一个请求都创建ClientResponse并添加到responses列表中。这里创建的ClientResponse会标识此响应并不是服务端返回的正常响应,而是因为连接断开产生的。如果是Metadata更新请求的响应,则会调用MetadataUpdater中的maybeHandleDisconnection()方法处理。最后将Metadata.needUpdate设置为true,标识需要更新集群元数据。

    /**
         * Handle any disconnected connections
         *
         * @param responses The list of responses that completed with the disconnection
         * @param now The current time
         */
        private void handleDisconnections(List<ClientResponse> responses, long now) {
            //更新连接状态,并清理掉InFlightRequests中断开连接的Node对应的ClientRequest
            for (String node : this.selector.disconnected()) {
                log.debug("Node {} disconnected.", node);
                processDisconnection(responses, node, now);
            }
            // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
            if (this.selector.disconnected().size() > 0)
                metadataUpdater.requestUpdate();//标识需要更新集群的元数据
        }
    
    /**
         * Post process disconnection of a node
         *
         * @param responses The list of responses to update
         * @param nodeId Id of the node to be disconnected
         * @param now The current time
         */
        private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
            connectionStates.disconnected(nodeId, now);//更新连接状态
            for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
                //调用MetadataUpdater.maybeHandleDisconnection()方法处理MetadataRequest
                log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
                //如果不是MetadataRequest,则创建ClientResponse并添加到responses集合
                //注意第三个参数,表示连接是否断开
                if (!metadataUpdater.maybeHandleDisconnection(request))
                    responses.add(new ClientResponse(request, now, true, null));
            }
        }
    

    handleDisconnections()

    遍历connected列表,将ConnectionStates中记录的连接状态改为CONNECTED。

    handleTimedOutRequests()

    遍历InFlightRequests集合,获取有超时请求的Node集合,之后的逻辑和handleDisconnections()一样。

    响应处理方法Sender.handleProduceResponse()

    经过一系列的handle*()方法处理后,NetworkClient.poll()方法产生的全部ClientResponse已经被收集到responses列表中。之后,遍历responses调用每个ClientRequest中记录的回调,如果是异常响应则要求重发,如果是正常响应则调用每个消息自定义的Callback。在creatteProduceRequests()方法中提到过,这里调用的Callback回调对象,也就是RequestCompletionHandler对象,其onComplete()方法最终调用Sender.handleProduceResponse()方法。

    断开连接或异常而产生的响应

    1. 遍历ClientRequest中的RecordBatch,尝试将RecordBatch重新加入RecordAccumulator,重新发送。
    2. 如果异常类型不允许重试或重试次数达到上限,就执行RecordBatch.done()方法,这个方法会循环调用RecordBatch中每个消息的Callback函数,并将RecordBatch的produceFuture设置为“异常完成”。最后,释放RecordBatch底层的ByteBuffer。
    3. 最后,根据异常的类型,决定是否设置更新Metadata标志。

    服务器正常的响应以及不需要相应的情况

    1.解析响应。
    2.遍历对应ClientRequest中的RecordBatch,执行RecordBatch.done()方法。
    3.释放RecordBatch底层的ByteBuffer。

    /**
         * Handle a produce response
         */
        private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
            int correlationId = response.request().request().header().correlationId();
            /*
             对于连接断开而产生的ClientResponse,会重试发送请求,
             如果不能重试,则调用其中每条消息回调。
             */
            if (response.wasDisconnected()) {
                log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
                                                                                                      .request()
                                                                                                      .destination());
                for (RecordBatch batch : batches.values())
                    completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
            } else {//正常响应
                log.trace("Received produce response from node {} with correlation id {}",
                          response.request().request().destination(),
                          correlationId);
                // if we have a response, parse it
                if (response.hasResponse()) {
                    ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
                    for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                        TopicPartition tp = entry.getKey();
                        ProduceResponse.PartitionResponse partResp = entry.getValue();
                        Errors error = Errors.forCode(partResp.errorCode);
                        RecordBatch batch = batches.get(tp);
                        //调用completeBatch()方法处理
                        completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
                    }
                    this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
                    this.sensors.recordThrottleTime(response.request().request().destination(),
                                                    produceResponse.getThrottleTime());
                } else {//不需要响应的请求,直接调用completeBatch()方法。
                    // this is the acks = 0 case, just complete all requests
                    for (RecordBatch batch : batches.values())
                        completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);
                }
            }
        }
    
    /**
         * Complete or retry the given batch of records.
         * 
         * @param batch The record batch
         * @param error The error (or null if none)
         * @param baseOffset The base offset assigned to the records if successful
         * @param timestamp The timestamp returned by the broker for this batch
         * @param correlationId The correlation id for the request
         * @param now The current POSIX time stamp in milliseconds
         */
        private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) {
            if (error != Errors.NONE && canRetry(batch, error)) {
                // retry
                log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                         correlationId,
                         batch.topicPartition,
                         this.retries - batch.attempts - 1,
                         error);
                //对于可重试的RecordBatch,则重新添加到RecordAccumulator中,等待发送
                this.accumulator.reenqueue(batch, now);
                this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
            } else {
                //不能重试,将RecordBatch标记为"异常完成",并释放RecordBatch
                RuntimeException exception;
                if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                    exception = new TopicAuthorizationException(batch.topicPartition.topic());
                else
                    exception = error.exception();
                // tell the user the result of their request
                //调用RecordBatch.done()方法,调用消息的回调函数
                batch.done(baseOffset, timestamp, exception);
                this.accumulator.deallocate(batch);//释放空间
                if (error != Errors.NONE)
                    this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
            }
            if (error.exception() instanceof InvalidMetadataException)
                metadata.requestUpdate();//表示需要更新Metadata中记录的集群元数据
            // Unmute the completed partition.
            if (guaranteeMessageOrder)
                this.accumulator.unmutePartition(batch.topicPartition);
        }
    

    相关文章

      网友评论

          本文标题:Kafka源码分析-Producer(7)-Sender分析(4

          本文链接:https://www.haomeiwen.com/subject/jcrfaftx.html