先上图
![](https://img.haomeiwen.com/i14399021/6379d6d284d0e488.png)
整个producer客户端主要由两个线程组成,这两个线程分别是主线程和Sender线程。当我们调用kafkaProducer.send(KafkaRecord xxx)
时,这个record
会通过拦截器(如果我们有设置)、序列化器以及分区器,最后会被缓存到消息累加器(RecordAccumulator
)中。然后Sender
线程负责从RecordAccumulator
中获取record
并将其发送到kafka broker
上。
所以RecordAccumulator
的主要作用便是缓存待发送的消息,这样可以使Sender线程批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator
就相当于一个缓冲池,这个池子是有大小的,我们可以通过producer端参数buffer.memory
来配置,默认值是32MB 。如果producer写消息到缓冲池的速度大于了Sender线程从缓冲池取消息的速度,那么会导致producer空间不足,这时候的主线程要么被阻塞,要么抛出异常,这个取决于producer端参数max.block.ms
如何配置,这个参数的默认值是60000,也就是60秒。
RecordAccumulator
内部为每一个分区都维护了一个队列,队列中存放的元素是ProducerBatch
。这里需要注意一下,ProducerBatch
和ProducerRecord
不是一样的概念,ProducerRecord
就是我们自己构造出来的一个待发送的消息体,ProducerBatch
包含了1个或者多个ProducerRecord
,可以理解为消息的一个批次,具体ProducerBatch
是如何包含多个ProducerRecord
的,这个之后会在讲解Accumulator
的文章中详细介绍。总之,这里我们只需要知道,当一个消息被存放到RecordAccumulator
中时,首先根据主题分区到找对应的消息队列,找不到即创建一个。然后从队列的尾部取出一个ProducerBatch
,向其内部追加本条消息。
之后Sender线程在从RecordAccumulator
中获取缓存的消息后,会将原本<分区,Deque<ProducerBatch>>
的保存形式转变为<Node, List<ProducerBatch>>
的形式,其中Node
代表Kafka集群的broker节点,并进一步封装成<Node, Request>
的形式。我们可以这样理解,Sender线程处理完成之后就需要将消息发往具体的broker节点了,相当于是网络侧,这时候其实并不关心消息是哪个分区的,所以保存的key
应该是Node
才比较合适。但是RecordAccumulator
中是需要接受KafkaProducer
发过来的消息,相当于是在用户侧,那用户更关心的当然是消息应该发往哪个分区,并不关心应该发往哪个节点,所以保存的key
应该是分区才对。
上面的图中还提到了一个InFlightRequest
的概念。它的主要作用是缓存已经发出去但是还没有收到响应的请求。当然,与这个概念相关的producer参数也有一些,之后会详细介绍。
网友评论