美文网首页
2021-03-26 kafka生产者RecordAccumul

2021-03-26 kafka生产者RecordAccumul

作者: CayChan | 来源:发表于2021-03-26 12:02 被阅读0次

RecordAccumulator

This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
累加器,就是kafka的客户端缓存。所有send的ProducerRecord存放在这里,等待发送到远程Broker中。
缓存空间是一定的,缓存满了之后,新的ProducerRecord在send时会被阻塞。

缓存池

image.png
  • RecordAccumulator

    RecordAccumulator中用ConcurrentMap保存当前缓存的partition和每个partition的数据,每个partition数据保存在双端队列中:ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches

ConcurrentMap实际上是一个自定义的CopyOnWriteMap

  • RecordBatch

    Deque中保存着所有待发送的消息,某些消息又组成了一个RecordBatch

  • ByteBuffer

    RecordBatch中保存着实际的数据,每个数据序列化成二进制数组后,与其他kafka消息协议字段一起保存在ByteBuffer

  • Compressor

    Compressor用来把数据写入到ByteBuffer中。它像一支笔将kafka数据写到ByteBuffer里。

    private final ByteBufferOutputStream bufferStream;
    private final DataOutputStream appendStream;
    
    bufferStream = new ByteBufferOutputStream(buffer);
    appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
    
  • Trunk

    trunk用来关联每个record,用于消息发出后回调发送方当前record是否发送成功

/**
 * A callback and the associated FutureRecordMetadata argument to pass to it.
 */
final private static class Thunk {
    final Callback callback;
    //记录了record的相关信息,比如key、value、partition、exception等
    final FutureRecordMetadata future;

    public Thunk(Callback callback, FutureRecordMetadata future) {
        this.callback = callback;
        this.future = future;
    }
}

相关文章

  • 2021-03-26 kafka生产者RecordAccumul

    RecordAccumulator This class acts as a queue that accumul...

  • kafka0.8

    1、Kafka分为:生产者(producer),消费者(consumer) 2、生产者提交消息,给Kafka集群,...

  • Kafka - 生产者初步学习

    Kafka - 生产者初步学习 一、kafka生产者组件 我们从创建一个 ProducerRecord 对象开始,...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • [kafka系列]之producer端消息发送

    本小节我们来讨论Kafka生产者是如何发送消息到Kafka的, Kafka项目有一个生产者客户端,我们可以通过这个...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • kafka使用

    框架: spring-kafka 1.2.2 生产者端代码 说明: bootstrapServers: kafka...

  • 【Kafka】Kafka 常用命令

    本篇结构: kafka topic 管理脚本 kafka 生产者控制台 kafka 消费者控制台 kafka 消费...

  • Kafka_读写流程

    kafka集群 写 生产者和kafka集群之间的流程 1.生产者将数据封装到ProducerRecord中,将Pr...

网友评论

      本文标题:2021-03-26 kafka生产者RecordAccumul

      本文链接:https://www.haomeiwen.com/subject/qujjhltx.html