美文网首页
kafka消息累加器作用

kafka消息累加器作用

作者: hello_kd | 来源:发表于2022-03-06 15:33 被阅读0次

程序中调用kafka生产者发送消息,并不是每调用一次send方法,就直接将消息通过底层网络发送给broker了,而是会将多个消息形成一个批次,然后再以批次的形式,发送给broker,当然了,消息的发送也不是由生产者线程发送的。那么,kafka的消息如何形成一个批次,以及批次的形式,这个就是消息累加器的作用。

下面从源码的角度来看下消息累加器是如何处理消息的,并且还会和分区器一起搭配使用,下面这个方法是doSend方法的实现逻辑,这里只截取和累加器相关的代码部分

//前面代码省略
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, true);

if (result.abortForNewBatch) {
    int prevPartition = partition;
    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
    partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);
    if (log.isTraceEnabled()) {
        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
    }
    // producer callback will make sure to call both 'callback' and interceptor callback
    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

    result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, false);
}

if (transactionManager != null && transactionManager.isTransactional())
    transactionManager.maybeAddPartitionToTransaction(tp);

if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    this.sender.wakeup();
}
return result.future;

在对消息的key、value进行序列化后,并且根据分区器选择好分区之后,会调用累加器的append方法,因此,重点关注下append方法的实现逻辑

/**
@param abortOnNewBatch,这个参数的作用是,是否放弃使用新的批次,每个分区都会对应一个双向队列,
每个队列的元素是一个批次,当有新消息时,会取出队列的最后一个元素,并将消息累加到该批次中,假如批次的容量达到上限了,那么新消息默认需要生成一个新的批次,
再重新添加到双向队列中,如果参数为true,表示在这种情况下,放弃使用新的批次
*/
public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock,
                                 boolean abortOnNewBatch) throws InterruptedException {
    //每调用一次append方法,都会被记录下来
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // 取出分区对应的双向队列,若没有,则生成一个新的队列,并放入到map中
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
           //试图将消息添加到队列的最后一个批次元素中,若添加成功,那么方法直接返回
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null)
                return appendResult;
        }

        //当添加失败时,若参数指定为true,那么方法会直接返回,不会创建新的批次。
//外层方法第一次调用append方法时传的参数为true,
//主要是因为,kafka的发送者线程(区别于生产者线程)以一个批次为发送基本单位,因此为了让消息尽量多的累加到一个批次,
//当第一次无法往分区队列的最后一个批次累加时,优先选择另一个分区的队列批次。
        if (abortOnNewBatch) {
            // Return a result that will cause another call to append.
            return new RecordAppendResult(null, false, false, true);
        }
        //计算此次消息需要的内存大小
        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // 再次检查生产者线程是否关闭了
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            //这边为何又要重新尝试append,因此当有多个线程并发的往同一分区累加消息,
//可能另一个线程已经生成好一个新的批次对象,并加入到双向队列中了,
//因而这边需要再次尝试append数据,而不是直接生成新的批次对象
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }
            //若尝试append失败之后,这里才开始真正的构建新的批次对象,并加入到双向队列之中
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                    callback, time.milliseconds()));

            dq.addLast(batch);
//每个批次还未添加到一个未完成的集合中,因此这些批次还未发送和得到broker的确认
            incomplete.add(batch);

            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}

append方法的返回对象RecordAppendResult包含以下几个

public final FutureRecordMetadata future;//消息记录的元数据信息
public final boolean batchIsFull;  //批次是否满了或者队列是否不为空
public final boolean newBatchCreated;//是否新创建的批次
public final boolean abortForNewBatch;//放弃使用新的批次,表示消息往分区append失败,需要重新append

其中abortForNewBatch决定doSend方法中是否再次调用append方法

if (result.abortForNewBatch) {
    int prevPartition = partition;
    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
    partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);
    if (log.isTraceEnabled()) {
        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
    }
    // producer callback will make sure to call both 'callback' and interceptor callback
    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

    result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, false);
}

上述方法体中,会调用分区器的onNewBatch方法,设置一个新的分区对应的粘性分区,然后往新的分区append数据,这里为何要使用新的分区,原因在上述append方法实现中解释过了。

当批次是满的或者是新创建时,doSend方法会唤醒发送者线程。这里有个地方需要注意的是,kafka生产者线程和发送者线程是分开的,生产者线程负责往底层的队列中添加消息的批次对象,而发送者线程不断从队列中取出消息批次来发送给broker,实现了消息的构造和发送解耦。

相关文章

  • kafka消息累加器作用

    程序中调用kafka生产者发送消息,并不是每调用一次send方法,就直接将消息通过底层网络发送给broker了,而...

  • 09-flink-Accumulator(累加器)

    09-flink-Accumulator(累加器) 概念 Accumulator(累加器):累加器主要作用在用户操...

  • kafka优化笔记

    1 mq的作用 解耦、异步、削峰填谷 2 kafka架构 1)Producer :消息生产者,就是向 kafka ...

  • Kafka面试题

    1.Kafka定义 Kafka是分布式的发布订阅消息系统,可划分的,冗余备份的持久性日志服务 2.Kafka的作用...

  • Kafka - 关于高水位和Leader Epoch

    高水位的作用 在 Kafka 中,高水位的作用主要有 2 个。 定义消息可见性,即用来标识分区下的哪些消息是可以被...

  • Spark中的共享变量---广播变量和累加器

    一.广播变量和累加器的作用累加器(集群规模之间的大变量):做Spark的全局统计使用广播变量(集群规模间的大常量)...

  • Kafka详解

    应大部分的小伙伴的要求,今天这篇咱们用大白话带你认识 Kafka。 Kafka 基础 消息系统的作用 大部分小伙伴...

  • Kafka常用命令

    启动Kafka并生产消费消息 启动ZooKeeper 启动Kafka 查看启动后kafka的版本 生产者发送消息 ...

  • Kafka/RocketMQ顺序消息对比

    一、Kafka顺序消息 Producer端:Kafka的顺序消息是通过partition key,将某类消息(例如...

  • Kafka实践

    kafka基本概念: Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为...

网友评论

      本文标题:kafka消息累加器作用

      本文链接:https://www.haomeiwen.com/subject/jszbrrtx.html