美文网首页Kafka
Kafka源码分析(四)高吞吐核心——RecordAccumul

Kafka源码分析(四)高吞吐核心——RecordAccumul

作者: 81e2cd2747f1 | 来源:发表于2020-02-28 16:24 被阅读0次

上一篇文章讲的是在主线程,消息在调用了send后,消息内容和该消息关联的future对象被一起放入了RecordAccumulator中,future对象最终被send方法返回。对于客户端来说,send方法返回了,但是send方法返回并不代表消息已经被成功发送到Broker了,如果接下去的任意行为都是需要确保消息成功发送的情况下进行,客户端需要调用future.get()等待future的完成。

这一节继续接下去的工作。消息被主线程放入RecordAccumulator后,主线程早就撒手不管了,这时一个叫做Sender线程会从RecordAccumulator把消息拉出来,并且发送给Broker。Sender线程早在构造KafkaProducer的时候,已经被创建和启动。

KafkaProducer(ProducerConfig config,
                Serializer<K> keySerializer,
                Serializer<V> valueSerializer,
                Metadata metadata,
                KafkaClient kafkaClient) {
    try {
        // ...
        this.sender = new Sender(logContext,
                client,
                this.metadata,
                this.accumulator,
                maxInflightRequests == 1,
                config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                acks,
                retries,
                metricsRegistry.senderMetrics,
                Time.SYSTEM,
                this.requestTimeoutMs,
                config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                this.transactionManager,
                apiVersions);
        String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();
        // ...
    } catch (Throwable t) {
        // ...
    }
}

Sender线程是一个事件循环,总是在while循环中做一些事情,接下来主要分析这两个事情

// org.apache.kafka.clients.producer.internals.Sender#run(long)
void run(long now) {
    // ..

    long pollTimeout = sendProducerData(now);
    client.poll(pollTimeout, now);
}

sendProducerData方法中,简化下它的主要逻辑

private long sendProducerData(long now) {
    Cluster cluster = metadata.fetch();

    // get the list of partitions with data ready to send
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // code

    // remove any nodes we aren't ready to send to
    // code

    // create produce requests
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
            this.maxRequestSize, now);
    // code

    // code
    sendProduceRequests(batches, now);

    return pollTimeout;
}

首先调用this.accumulator.ready(cluster, now)找到哪一些Broker是已经准备好的。然后再调用this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now)将那些已经准备的Broker上的Batch进行重新整理后,全部从RecordAccumulator的Deque中取出来,发送出去。

相关文章

网友评论

    本文标题:Kafka源码分析(四)高吞吐核心——RecordAccumul

    本文链接:https://www.haomeiwen.com/subject/judlhhtx.html