美文网首页Kafka
Kafka源码分析(四)高吞吐核心——RecordAccumul

Kafka源码分析(四)高吞吐核心——RecordAccumul

作者: 81e2cd2747f1 | 来源:发表于2020-02-28 16:24 被阅读0次

    上一篇文章讲的是在主线程,消息在调用了send后,消息内容和该消息关联的future对象被一起放入了RecordAccumulator中,future对象最终被send方法返回。对于客户端来说,send方法返回了,但是send方法返回并不代表消息已经被成功发送到Broker了,如果接下去的任意行为都是需要确保消息成功发送的情况下进行,客户端需要调用future.get()等待future的完成。

    这一节继续接下去的工作。消息被主线程放入RecordAccumulator后,主线程早就撒手不管了,这时一个叫做Sender线程会从RecordAccumulator把消息拉出来,并且发送给Broker。Sender线程早在构造KafkaProducer的时候,已经被创建和启动。

    KafkaProducer(ProducerConfig config,
                    Serializer<K> keySerializer,
                    Serializer<V> valueSerializer,
                    Metadata metadata,
                    KafkaClient kafkaClient) {
        try {
            // ...
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    retries,
                    metricsRegistry.senderMetrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs,
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            // ...
        } catch (Throwable t) {
            // ...
        }
    }
    

    Sender线程是一个事件循环,总是在while循环中做一些事情,接下来主要分析这两个事情

    // org.apache.kafka.clients.producer.internals.Sender#run(long)
    void run(long now) {
        // ..
    
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }
    

    sendProducerData方法中,简化下它的主要逻辑

    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();
    
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
        // code
    
        // remove any nodes we aren't ready to send to
        // code
    
        // create produce requests
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
        // code
    
        // code
        sendProduceRequests(batches, now);
    
        return pollTimeout;
    }
    

    首先调用this.accumulator.ready(cluster, now)找到哪一些Broker是已经准备好的。然后再调用this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now)将那些已经准备的Broker上的Batch进行重新整理后,全部从RecordAccumulator的Deque中取出来,发送出去。

    相关文章

      网友评论

        本文标题:Kafka源码分析(四)高吞吐核心——RecordAccumul

        本文链接:https://www.haomeiwen.com/subject/judlhhtx.html