美文网首页
Kafka源码分析-Producer(4)-Sender分析(1

Kafka源码分析-Producer(4)-Sender分析(1

作者: 陈阳001 | 来源:发表于2018-09-29 22:50 被阅读0次

一.Sender介绍

根据前面的分析我们知道,主线程通过KafakProducer.send()方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络I/O操作是由Sender线程统一进行的。

Sender线程发送消息的整个流程:

  • 用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息。
  • 根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点。
  • 创建请求,每个Node节点只生成一个请求。
  • 调用NetworkClient将请求发送出去。


    image.png

    Sender实现了Runnable接口,并运行在单独的ioThread中。Sender的run()方法调用了重载的run(long),这才是Sender线程的核心方法,这是发送消息的流程,时序图如下:


    Sender处理流程.jpg
/**
     * Run a single iteration of sending
     * 
     * @param now
     *            The current POSIX time in milliseconds
     */
    void run(long now) {
        //1.从MetaDate获得Kafka集群元信息
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        //2.调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,
        //选出可以向哪些Node节点发送信息,返回ReadyCheckResult对象
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        // 3.如果ReadyCheckResult中标识有unknownLeaderExist,
        // 则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        /*
        4.针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,
        目的是监测网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合中删除。
         */
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        //5.获取待发送的消息集合,把tp和List<RecordBatch>的对应关系转换为node_id和List<RecordBatch>的对应关系
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
        //6.调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时消息
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        //7.调用Sender.createProduceRequests()方法将待发送的消息封装成ClientRequest。
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        //8.调用NetworkClient.send()方法,将ClientRequest写入KafkaChannel的send字段。
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        /*
          9.调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去。
            同时,还会处理服务器发挥的响应,处理超时的请求,调用用户自定义Callback等。
         */
        
        this.client.poll(pollTimeout, now);
    }

简述run(long)的处理流程:

  1. 从MetaDate获得Kafka集群元信息。
  2. 调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送信息,返回ReadyCheckResult对象。
  3. 如果ReadyCheckResult中标识有unknownLeaderExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
  4. 针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,目的是监测网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合中删除。
  5. 经过步骤4处理的readyNodes集合,调用RecordAccumulator.drain()方法,获取待发送的消息集合。
  6. 调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时消息。具体逻辑是:遍历RecordAccumulator中保存的全部的RecordBatch,调用RecordBatch.maybeExpire()方法进行处理。如果已经超时,就调用RecordBatch.done()方法,这样会触发自定义的Callback,并将RecordBatch从队列中移除,释放ByteBuffer空间。
  7. 调用Sender.createProduceRequests()方法将待发送的消息封装成ClientRequest。
  8. 调用NetworkClient.send()方法,将ClientRequest写入KafkaChannel的send字段。
  9. 调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,同时,还会处理服务器发挥的响应,处理超时的请求,调用用户自定义Callback等。

二.创建请求:

在org.apache.kafka.common.protocol.Protocol类中罗列了全部请求和响应的格式,请求和响应有不同的版本。首先,介绍生产者向服务端追加消息时使用的请求和响应,分别是Producer_Request_V2和Produce_Response_V2:

Producer_Request_V2请求头和请求体各个字段的介绍:

名称 类型 含义
api_key short API标识
api_version short API版本号
correlation_id int 序号,由客户端产生,单调递增,服务端不做任何修改,在Response中会回传给客户端
client_id short API客户端ID,可为Null
acks short 指定服务端响应此请求之前,需要有多少replica成功复制了此请求的信息。-1表示整个ISR都完成了复制
timeout int 超时时间,单位是ms
topic String Topic名称
partition int partition编号
record_set byte数组 消息的有效负载

Producer_Response_V2各个字段的介绍:

名称 类型 含义
correlation_id int 序号,由客户端产生,单调递增,服务端不做任何修改,在Response中会回传给客户端
topic String Topic名称
partition int partition编号
error_code short 异常编号
base_offset long 服务端为消息生成的偏移量
timestamp long 服务端产生的时间戳
throttle int 延迟时间,单位是ms

Sender.createProduceRequests()方法的功能是将待发送的消息封装成ClientRequest,不管一个Node对应有多少个RecordBatch,也不管这些RecordBatch发给几个分区,为每个Node仅仅生成一个ClientRequest对象。创建ClientRequest的核心逻辑如下:

  1. 将一个Node对应的RecordBatch集合,重新整理为produceRecordsByPartition(Map<TopicPartition,ByteBuffer>)和recordsByPartition(Map<TopicPartition,RecordBatch>)两个集合。
  2. 创建RequestSend,RequestSend是真正通过网络I/O发送的对象,其格式符合上面描述的Producer_Request_V2协议,其中有效负载就是produceRecordsByPartition中的数据。
  3. 创建RequestCompletionHandler作为回调对象。
  4. 将RequestSend对象和RequestCompletionHandler对象封装进ClientRequest对象中,并将其返回。
    看下代码:
/**
     * Transfer the record batches into a list of produce requests on a per-node basis
     */
    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
            //调用produceRequest()方法,将发往同一个Node的RecordBatch封装成一个ClientRequest对象。
            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
        return requests;
    }

    /**
     * Create a produce request from the given record batches
     */
    private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
        
        //produceRecordsByPartition和recordsByPartition的value不一样,一个是ByteBuffer,另一个是 RecordBatch
        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
        
        //1.将RecordBatch列表按照partition分类,整理成上述两个集合。
        for (RecordBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            produceRecordsByPartition.put(tp, batch.records.buffer());
            recordsByPartition.put(tp, batch);
        }
        //2.创建ProduceRequest和RequestSend
        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
        RequestSend send = new RequestSend(Integer.toString(destination),
                                           this.client.nextRequestHeader(ApiKeys.PRODUCE),
                                           request.toStruct());
        //3.创建RequestCompletionHandler作为回调对象,具体逻辑后面详细解释。
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };
        //创建ClientRequest对象,第二个参数是根据acks配置决定请求是否需要获取响应。
        return new ClientRequest(now, acks != 0, send, callback);
    }

ProduceRequest的格式和创建过程就分析完成了。创建在后面的流程中,发送的事RequestSend对象,会将ClientRequest放入InFlightRequest中缓存,当请求收到响应或出现异常时,通过缓存的ClientRequest调用其RequestCompletionHandler对象。具体下面介绍。

相关文章

网友评论

      本文标题:Kafka源码分析-Producer(4)-Sender分析(1

      本文链接:https://www.haomeiwen.com/subject/vthioftx.html