消息结构
public class ProducerRecord<K, V> {
private final String topic; //消息发往的主题
private final Integer partition;//消息要发往的分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值,与业务相关的消息体
private final Long timestamp; //消息的时间戳
//省略其他成员方法和构造方法
}
其中:
- key是指定消息的键,可以用来计算分区号以发往特定的分区;有key的消息还能支持日志压缩,压缩后会保留最后一条数据。
- value:是指发送的具体消息
topic和value必填,其他字段选填。
生产者整体架构
整体架构整个生产者客户端包括主线程和 Sender 线程(发送线程)两部分。其中,在主线程中由 KatkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator 中获取消息并将其发送到Kafka 中。下面将详细说明各部分的作用。
拦截器
拦截器包括生产者拦截器和消费者拦截器,可以根据某个规则过滤不符合要求的消息、修改消息的内容等。
序列化器
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给 Kafka。而
消费者需要用反序列化器把从Katka中收到的字节数组转换成相应的对象,序列化是必须的,且序列化和反序列化方法必须一一对应。
分区器
如果消息中没有指定分区字段,则分区器需要根据Key来计算分区值,目的就是为了消息分配。
如果指定了key,那么会对Key进行哈希计算,根据得到的哈希值来计算分区号,在分区数不变的情况下,可保证相同Key的消息写入同一分区;如果没有指定key,则是以轮询的方式发送到各个可用的分区。
消息累加器
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,目的是为了减少网络传输的资源消耗以提升性能。
在Recordaccumulator的内部为每个分区都维护了一个双端队列,主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列中,队列中的内容就是批量消息(ProducerBatch),包含多条生产者创建的消息(producerRecord),批量消息的大小可由参数控制。消息写入缓存时,追加到双端队列的尾部;sender在读消息时,是从头部开始读取。
当一条新的消息到达累加器时,会先根据分区找到对应的双端队列,然后在双端队列尾部寻找producerBatch,如果不存在则新建;如果存在,则判断producerBatch是否还能容纳新的消息,如果能则写入,如果不能在新建producerBatch。
Sender线程
Sender从累加器中获取一个批次的消息之后,跟进行数据的转换,这种主要做了两件事:一是根据partiton得到要发送到的Broker的地址;二是将按照kafka内置的请求协议将消息转换成对应的request,进而发往各个broker。
请求在发送到broker之前,会被写入InFlightRequest中(连接维度,即broker维度的缓存队列),主要目的是记录已发送但是没收到响应的请求,最多缓存的请求数量可以通过配置参数设置,当缓存的请求数达到最大数量限制时,便不再向对应的broker发送更多请求。
消息存储
todo这部分主要介绍:
- 消息是如何存储的?
- kafka为什么使用磁盘作为存储介质?如何实现高吞吐和低延迟?
- 如何检索指定消息?
- 消息不可能无线存储,清理规则是什么?
参考书籍:《深入理解kafka核心设计与实践原理》
网友评论