美文网首页
Kafka源码分析-Producer(4)-Sender分析(1

Kafka源码分析-Producer(4)-Sender分析(1

作者: 陈阳001 | 来源:发表于2018-09-29 22:50 被阅读0次

    一.Sender介绍

    根据前面的分析我们知道,主线程通过KafakProducer.send()方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络I/O操作是由Sender线程统一进行的。

    Sender线程发送消息的整个流程:

    • 用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息。
    • 根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点。
    • 创建请求,每个Node节点只生成一个请求。
    • 调用NetworkClient将请求发送出去。


      image.png

      Sender实现了Runnable接口,并运行在单独的ioThread中。Sender的run()方法调用了重载的run(long),这才是Sender线程的核心方法,这是发送消息的流程,时序图如下:


      Sender处理流程.jpg
    /**
         * Run a single iteration of sending
         * 
         * @param now
         *            The current POSIX time in milliseconds
         */
        void run(long now) {
            //1.从MetaDate获得Kafka集群元信息
            Cluster cluster = metadata.fetch();
            // get the list of partitions with data ready to send
            //2.调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,
            //选出可以向哪些Node节点发送信息,返回ReadyCheckResult对象
            RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
            // if there are any partitions whose leaders are not known yet, force metadata update
            // 3.如果ReadyCheckResult中标识有unknownLeaderExist,
            // 则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
            if (result.unknownLeadersExist)
                this.metadata.requestUpdate();
    
            // remove any nodes we aren't ready to send to
            Iterator<Node> iter = result.readyNodes.iterator();
            long notReadyTimeout = Long.MAX_VALUE;
            /*
            4.针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,
            目的是监测网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合中删除。
             */
            while (iter.hasNext()) {
                Node node = iter.next();
                if (!this.client.ready(node, now)) {
                    iter.remove();
                    notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
                }
            }
    
            // create produce requests
            //5.获取待发送的消息集合,把tp和List<RecordBatch>的对应关系转换为node_id和List<RecordBatch>的对应关系
            Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                             result.readyNodes,
                                                                             this.maxRequestSize,
                                                                             now);
            if (guaranteeMessageOrder) {
                // Mute all the partitions drained
                for (List<RecordBatch> batchList : batches.values()) {
                    for (RecordBatch batch : batchList)
                        this.accumulator.mutePartition(batch.topicPartition);
                }
            }
            //6.调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时消息
            List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
            // update sensors
            for (RecordBatch expiredBatch : expiredBatches)
                this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    
            sensors.updateProduceRequestMetrics(batches);
            //7.调用Sender.createProduceRequests()方法将待发送的消息封装成ClientRequest。
            List<ClientRequest> requests = createProduceRequests(batches, now);
            // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
            // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
            // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
            // with sendable data that aren't ready to send since they would cause busy looping.
            long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
            if (result.readyNodes.size() > 0) {
                log.trace("Nodes with data ready to send: {}", result.readyNodes);
                log.trace("Created {} produce requests: {}", requests.size(), requests);
                pollTimeout = 0;
            }
            //8.调用NetworkClient.send()方法,将ClientRequest写入KafkaChannel的send字段。
            for (ClientRequest request : requests)
                client.send(request, now);
    
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            /*
              9.调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去。
                同时,还会处理服务器发挥的响应,处理超时的请求,调用用户自定义Callback等。
             */
            
            this.client.poll(pollTimeout, now);
        }
    

    简述run(long)的处理流程:

    1. 从MetaDate获得Kafka集群元信息。
    2. 调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送信息,返回ReadyCheckResult对象。
    3. 如果ReadyCheckResult中标识有unknownLeaderExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
    4. 针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,目的是监测网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合中删除。
    5. 经过步骤4处理的readyNodes集合,调用RecordAccumulator.drain()方法,获取待发送的消息集合。
    6. 调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时消息。具体逻辑是:遍历RecordAccumulator中保存的全部的RecordBatch,调用RecordBatch.maybeExpire()方法进行处理。如果已经超时,就调用RecordBatch.done()方法,这样会触发自定义的Callback,并将RecordBatch从队列中移除,释放ByteBuffer空间。
    7. 调用Sender.createProduceRequests()方法将待发送的消息封装成ClientRequest。
    8. 调用NetworkClient.send()方法,将ClientRequest写入KafkaChannel的send字段。
    9. 调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,同时,还会处理服务器发挥的响应,处理超时的请求,调用用户自定义Callback等。

    二.创建请求:

    在org.apache.kafka.common.protocol.Protocol类中罗列了全部请求和响应的格式,请求和响应有不同的版本。首先,介绍生产者向服务端追加消息时使用的请求和响应,分别是Producer_Request_V2和Produce_Response_V2:

    Producer_Request_V2请求头和请求体各个字段的介绍:

    名称 类型 含义
    api_key short API标识
    api_version short API版本号
    correlation_id int 序号,由客户端产生,单调递增,服务端不做任何修改,在Response中会回传给客户端
    client_id short API客户端ID,可为Null
    acks short 指定服务端响应此请求之前,需要有多少replica成功复制了此请求的信息。-1表示整个ISR都完成了复制
    timeout int 超时时间,单位是ms
    topic String Topic名称
    partition int partition编号
    record_set byte数组 消息的有效负载

    Producer_Response_V2各个字段的介绍:

    名称 类型 含义
    correlation_id int 序号,由客户端产生,单调递增,服务端不做任何修改,在Response中会回传给客户端
    topic String Topic名称
    partition int partition编号
    error_code short 异常编号
    base_offset long 服务端为消息生成的偏移量
    timestamp long 服务端产生的时间戳
    throttle int 延迟时间,单位是ms

    Sender.createProduceRequests()方法的功能是将待发送的消息封装成ClientRequest,不管一个Node对应有多少个RecordBatch,也不管这些RecordBatch发给几个分区,为每个Node仅仅生成一个ClientRequest对象。创建ClientRequest的核心逻辑如下:

    1. 将一个Node对应的RecordBatch集合,重新整理为produceRecordsByPartition(Map<TopicPartition,ByteBuffer>)和recordsByPartition(Map<TopicPartition,RecordBatch>)两个集合。
    2. 创建RequestSend,RequestSend是真正通过网络I/O发送的对象,其格式符合上面描述的Producer_Request_V2协议,其中有效负载就是produceRecordsByPartition中的数据。
    3. 创建RequestCompletionHandler作为回调对象。
    4. 将RequestSend对象和RequestCompletionHandler对象封装进ClientRequest对象中,并将其返回。
      看下代码:
    /**
         * Transfer the record batches into a list of produce requests on a per-node basis
         */
        private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
            List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
            for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
                //调用produceRequest()方法,将发往同一个Node的RecordBatch封装成一个ClientRequest对象。
                requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
            return requests;
        }
    
        /**
         * Create a produce request from the given record batches
         */
        private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
            
            //produceRecordsByPartition和recordsByPartition的value不一样,一个是ByteBuffer,另一个是 RecordBatch
            Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
            final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
            
            //1.将RecordBatch列表按照partition分类,整理成上述两个集合。
            for (RecordBatch batch : batches) {
                TopicPartition tp = batch.topicPartition;
                produceRecordsByPartition.put(tp, batch.records.buffer());
                recordsByPartition.put(tp, batch);
            }
            //2.创建ProduceRequest和RequestSend
            ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
            RequestSend send = new RequestSend(Integer.toString(destination),
                                               this.client.nextRequestHeader(ApiKeys.PRODUCE),
                                               request.toStruct());
            //3.创建RequestCompletionHandler作为回调对象,具体逻辑后面详细解释。
            RequestCompletionHandler callback = new RequestCompletionHandler() {
                public void onComplete(ClientResponse response) {
                    handleProduceResponse(response, recordsByPartition, time.milliseconds());
                }
            };
            //创建ClientRequest对象,第二个参数是根据acks配置决定请求是否需要获取响应。
            return new ClientRequest(now, acks != 0, send, callback);
        }
    

    ProduceRequest的格式和创建过程就分析完成了。创建在后面的流程中,发送的事RequestSend对象,会将ClientRequest放入InFlightRequest中缓存,当请求收到响应或出现异常时,通过缓存的ClientRequest调用其RequestCompletionHandler对象。具体下面介绍。

    相关文章

      网友评论

          本文标题:Kafka源码分析-Producer(4)-Sender分析(1

          本文链接:https://www.haomeiwen.com/subject/vthioftx.html