美文网首页
一条消息是如何写入的

一条消息是如何写入的

作者: 米兰的小铁匠xxm | 来源:发表于2022-06-12 23:13 被阅读0次

消息结构

public class ProducerRecord<K, V> {
    private final String topic; //消息发往的主题
    private final Integer partition;//消息要发往的分区号
    private final Headers headers; //消息头部
    private final K key; //键
    private final V value; //值,与业务相关的消息体
    private final Long timestamp;  //消息的时间戳
    //省略其他成员方法和构造方法
}

其中:

  • key是指定消息的键,可以用来计算分区号以发往特定的分区;有key的消息还能支持日志压缩,压缩后会保留最后一条数据。
  • value:是指发送的具体消息
    topic和value必填,其他字段选填。

生产者整体架构

整体架构

整个生产者客户端包括主线程和 Sender 线程(发送线程)两部分。其中,在主线程中由 KatkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator 中获取消息并将其发送到Kafka 中。下面将详细说明各部分的作用。

拦截器

拦截器包括生产者拦截器和消费者拦截器,可以根据某个规则过滤不符合要求的消息、修改消息的内容等。

序列化器

生产者需要用序列化器把对象转换成字节数组才能通过网络发送给 Kafka。而
消费者需要用反序列化器把从Katka中收到的字节数组转换成相应的对象,序列化是必须的,且序列化和反序列化方法必须一一对应。

分区器

如果消息中没有指定分区字段,则分区器需要根据Key来计算分区值,目的就是为了消息分配。
如果指定了key,那么会对Key进行哈希计算,根据得到的哈希值来计算分区号,在分区数不变的情况下,可保证相同Key的消息写入同一分区;如果没有指定key,则是以轮询的方式发送到各个可用的分区。

消息累加器

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,目的是为了减少网络传输的资源消耗以提升性能。
在Recordaccumulator的内部为每个分区都维护了一个双端队列,主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列中,队列中的内容就是批量消息(ProducerBatch),包含多条生产者创建的消息(producerRecord),批量消息的大小可由参数控制。消息写入缓存时,追加到双端队列的尾部;sender在读消息时,是从头部开始读取。
当一条新的消息到达累加器时,会先根据分区找到对应的双端队列,然后在双端队列尾部寻找producerBatch,如果不存在则新建;如果存在,则判断producerBatch是否还能容纳新的消息,如果能则写入,如果不能在新建producerBatch。

Sender线程

Sender从累加器中获取一个批次的消息之后,跟进行数据的转换,这种主要做了两件事:一是根据partiton得到要发送到的Broker的地址;二是将按照kafka内置的请求协议将消息转换成对应的request,进而发往各个broker。
请求在发送到broker之前,会被写入InFlightRequest中(连接维度,即broker维度的缓存队列),主要目的是记录已发送但是没收到响应的请求,最多缓存的请求数量可以通过配置参数设置,当缓存的请求数达到最大数量限制时,便不再向对应的broker发送更多请求。

消息存储

todo这部分主要介绍:

  1. 消息是如何存储的?
  2. kafka为什么使用磁盘作为存储介质?如何实现高吞吐和低延迟?
  3. 如何检索指定消息?
  4. 消息不可能无线存储,清理规则是什么?

参考书籍:《深入理解kafka核心设计与实践原理》

相关文章

网友评论

      本文标题:一条消息是如何写入的

      本文链接:https://www.haomeiwen.com/subject/dbjnmrtx.html