今天我们来通过源码来分析一下,生产者发送一条消息的所有流程~~~
生产者客户端代码
public class SzzTestSend {
public static final String bootStrap = "xxxxxx:9090";
public static final String topic = "t_3_1";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);
// 序列化协议 下面两种写法都可以
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//过滤器 可配置多个用逗号隔开
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");
//构造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
// 发送消息, 并设置 回调(回调函数也可以不要)
ProducerRecord<String,String> record = new ProducerRecord(topic,"Hello World!");
try {
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 发送成功回调类
*/
public static class SzzTestCallBack implements Callback{
private static final Logger log = LoggerFactory.getLogger(SzzTestCallBack.class);
private String topic;
private String key;
private String value;
public SzzTestCallBack(String topic, String key, String value) {
this.topic = topic;
this.key = key;
this.value = value;
}
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
topic, key,value, e);
}else {
log.info("send message to topic {} with key: {} value:{} success, partiton:{} offset:{}",
topic, key,value,metadata.partition(),metadata.offset());
}
}
}
}
构造KafkaProducer
KafkaProducer通过解析producer.propeties
文件里面的属性来构造自己。 例如 :分区器、Key和Value序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
//构造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
生产者元信息更新器
我们之前有讲过. 客户端都会保存集群的元信息,例如生产者的元信息是 ProducerMetadata. 消费组的是ConsumerMetadata 。
元信息都会有自己的自动更新逻辑, 详细请看Kafka的客户端发起元信息更新请求
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
metadata.max.age.ms | 即使我们没有看到任何分区领导层更改以主动发现任何新代理或分区,我们也强制刷新元数据的时间段(以毫秒为单位)。。 | 300000(5分钟) |
retry.backoff.ms | 如果上次更新失败,发起重试的间隔时间 | 100 |
虽然Producer元信息会自动更新, 但是有可能在生产者发送消息的时候,发现某个TopicPartition不存在,这个时候可能就需要立刻发起一个元信息更新了。
生产者拦截器
生产者拦截器在消息发送之前可以做一些准备工作, 比如 按照某个规则过滤某条消息, 又或者对 消息体做一些改造, 还可以用来在发送回调逻辑之前做一些定制化的需求,例如统计类的工作! 拦截器的执行时机在最前面,在消息序列化和分区计算之前
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
interceptor.classes | 生产者拦截器配置,填写全路径类名,可用逗号隔开配置多个,执行顺序就是配置的顺序。 | 空 |
生产者分区器
用来设置发送的消息具体要发送到哪个分区上
相关的Producer配置有:
属性 | 描述 | 默认值 |
---|---|---|
partitioner.class | 消息的分区分配策略 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
Sender线程启动
Sender是专门负责将消息发送到Broker的I/O线程。
相关的Producer配置有:
属性 | 描述 | 默认值 |
---|---|---|
max.in.flight.requests.per.connection | 客户端能够允许的最大未完成请求(在请求中)的请求数量, 如果该值大于1, 并且请求发送失败可可能导致消息重排序的风险(如果重试启用的话) | 5 |
request.timeout.ms | 控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败 | 30000(30 秒) |
connections.max.idle.ms | 在此配置指定的毫秒数后关闭空闲连接。 | 540000(9 分钟) |
reconnect.backoff.ms | 在尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试 | 50 |
reconnect.backoff.max.ms | 重新连接到反复连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,每台主机的退避将在每次连续连接失败时呈指数增长,直至达到此最大值。在计算回退增加后,添加 20% 的随机抖动以避免连接风暴。 | 1000(1 秒) |
retry.backoff.ms | 在尝试重试对给定主题分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧密循环中重复发送请求。 | 100 |
send.buffer.bytes | 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。如果值为 -1,将使用操作系统默认值。 | 131072(128 千字节) |
receive.buffer.bytes | 读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。如果值为 -1,将使用操作系统默认值。 | 32768 |
acks | 生产者要求Leader在决定是否完成请求之前收到的确认数量. 这控制了发送的记录的持久性 可配置的参数如下:1.acks=0 如果为0, 生产者不会等待服务器的任何确认, 会被立即视为已发送,这种情况下不能保证服务器是否真的已经收到了消息。这个时候retries 配置不会生效(客户端都不管服务端的返回了,所以客户端一般是不知道有故障的)2.acks=1 Leader会将消息写入到它的本地日志中,但是不会等待所有的Follower完全确认就会返回发送成功状态。 这种情况下, 当Follower成功同步数据之前Leader挂掉了会造成数据丢失。3.acks=all Leader将等待所有的ISR中的副本完成同步之后返回成功状态, 这样子数据就不会丢失,是最高级别的保证。 |
1 |
transactional.id | ||
enable.idempotence | 是否启动幂等。当设置为true时候, 生产者将确保每条消息被最多写入一个副本,如果未false,生产者由于Broker失败等原因重试,可能会写入到多个副本中。注意:启动幂等性的要求max.in.flight.requests.per.connection<=5 retries>0 并且 acks=all .如果设置了不兼容的值则会抛出异常 |
false |
max.request.size | 请求的最大大小(以字节为单位)。此设置将限制生产者在单个请求中发送的记录批次的总数据量,以避免发送大量请求。这实际上也是最大未压缩记录批量大小的上限。请注意,服务器对记录批量大小有自己的上限(如果启用压缩,则在压缩之后),这可能与此不同。 | 1048576 |
retries | 生产者重试次数,当max.in.flight.requests.per.connection>1 的情况发生重试可能会导致顺序问题. |
2147483647 |
delivery.timeout.ms | 最大交付时间, 调用send()方法后不管是成功还是失败的时间上限。例如重试太多次之后达到次配置时间的时候也会停止重试了。此配置值应该大于等于request.timeout.ms 和linger.ms 总和 |
120000 (2 minutes). 如果这个值你没有主动设置并且request.timeout.ms +linger.ms > 120000(默认值) ,那么它最终的值是request.timeout.ms +linger.ms
|
发送请求
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
生产者拦截器
发送消息的第一步就是执行拦截器
一般情况下我们可能不需要拦截器, 但是我们需要用拦截器的时候按照下面操作执行:
- 在配置文件中配置属性
interceptor.classes=拦截器1,拦截器2,拦截器3
- 实现接口
org.apache.kafka.clients.producer.ProducerInterceptor<K, V>
这个interceptor.classes
中的属性可以配置多个拦截器, 用逗号隔开,并且执行顺序就是按照配置的顺序执行的。
拦截器的执行时机在最前面,在消息序列化和分区计算之前
ProducerInterceptor
org.apache.kafka.clients.producer.ProducerInterceptor<K, V>
接口方法讲解:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
onSend(ProducerRecord<K, V> record)方法 :
当客户端将记录发送到
KafkaProducer
时,在键和值被序列化之前调用。 该方法调用ProducerInterceptor.onSend(ProducerRecord)
方法。 从第一个拦截器的onSend()
返回的ProducerRecord
传递给第二个拦截器 onSend(),在拦截器链中依此类推。 从最后一个拦截器返回的记录就是从这个方法返回的。 此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并忽略。 如果链中间的拦截器(通常会修改记录)抛出异常,则链中的下一个拦截器将使用前一个未抛出异常的拦截器返回的记录调用。
调用地方
①. 拦截器执行时机在键值序列化之前 ②. 拦截器抛出异常会被捕获,并打印日志,那么也意味着这个拦截器所做的修改不会生效 ③.拦截器中修改的消息体会被传递到下一个拦截器
onAcknowledgement(RecordMetadata metadata, Exception exception)方法:
当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时,将调用此方法。 此方法通常在用户设置的Callback之前调用,此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并忽略。这个方法运行在Producer的I/O线程中,所以这个方法中的代码逻辑需要越简单越好。 否则,来自其他线程的消息发送可能会延迟。
参数:
metadata
– 已发送记录的元数据(即分区和偏移量)。 如果发生错误,元数据将只包含有效的主题和分区。 如果 ProducerRecord 中没有给出 partition 并且在分配 partition 之前发生错误,则 partition 将设置为 RecordMetadata.NO_PARTITION。 如果客户端将空记录传递给KafkaProducer.send(ProducerRecord)则元数据可能为空。exception
– 在处理此记录期间抛出的异常。 如果没有发生错误,则为空。
close()
主要用于在关闭拦截器时自行一些资源清理工作。
configure(Map<String, ?> configs)
ProducerInterceptor
接口中集成了一Configurable
接口,接口有个方法
void configure(Map<String, ?> configs);
也就是说在拦截器中,我们可以拿到所有的配置属性了; 这个方法在这几个方法中最早执行
生产者拦截器示例
将发送的消息加上后缀 注意这里消息value的类型是String
,如果是byte则需要处理一下
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("生产者拦截器 onSend() run ."+record);
return new ProducerRecord<>(
record.topic(), record.partition(), record.key(), record.value().concat("_后缀")); }
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("生产者拦截器 onAcknowledgement run ."+metadata.toString() +" exception:"+exception);
}
@Override
public void close() {
System.out.println("生产者拦截器 close() run .");
}
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
System.out.println("生产者拦截器 configure run ."+configs);
}
更新元信息waitOnMetadata
在发送消息之前,要先获取一下将要发送的TopicPartition的元信息。这个获取元信息的请求也是通过唤醒 Sender线程进行发送的。
-
ProducerMetadata
元信息Map<String, Long> topics
中保存Topic
的有效期时间,metadata.max.idle.ms
控制,默认300000
-
ProducerMetadata
元信息Set<String> newTopics
中保存所有Topic
- 获取 Topic的
元数据集群
以及我们等待的时间(以毫秒为单位), 这个获取元数据不是这里获取的,这里只是判断当前是否已经获取到了元数据,如果没有获取到,则一直等待,最大等待时间由max.block.ms
控制,默认60000(1分钟),关于获取元数据在最上面已经分析过了, 是Sender线程获取并更新的。如果等待时间超过了max.block.ms
,很有可能网络异常,那么会抛出超时异常。 - 当你发送消息的时候指定了分区号, 但是这个分区号是不存在的, 这个时候就会一直发起
Metadata
请求(流程看最上面), 直到超时(max.block.ms
)之后 抛出异常
org.apache.kafka.common.errors.TimeoutException: Topic t_3_1 not present in metadata after 60000 ms.
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
max.block.ms | 生产者发送消息过程中,获取元信息的最大超时时间 | 60000(1分钟) |
metadata.max.idle.ms | Topic的最大空闲时间. 如果一个主题在这么多毫秒内没有被访问过,它就会从缓存中删除。并且下一次对其的访问将强制执行元数据获取请求。 | 300000(5分钟) |
KafkaProducer producer = new KafkaProducer(properties);
在构建KafkaProducer
对象的时候, 有构建 producer I/O thread
, 并且启动了, Runnable
是 sender
最终调用NetworkClient.poll(long timeout, long now)
里面maybeUpdate()
方法 这个方法会获取 前Node中负载最少的节点发起网络请求, 如果所有Node都是满负载则请求不会被发起。
如何判断哪个节点负载最少?
通过每个节点的InFlightRequests(空中请求数量)
里面的最小数量判断,这个表示当前正在发起的请求,但是还没有收到回复的请求数量; 保存形式是一个HashMap,key
是Node的Id, value
是所有当前还在请求中的节点; 当请求完成,请求就会在这个队列里面移除; 如果这个队列一直是满的,说明当前负载很高或者网络连接有问题。如果所有Node都是满负载则请求不会被发起,除非等到队列数量减少。
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
每个Node最大负载数 ?
每个客户端在发起请求还没有收到回复的时候都会被缓存到InFlightRequests(空中请求数量)
里面,但是这个数量是有限制的,这个可以通过配置max.in.flight.requests.per.connection
进行设置, 默认是: 5; 也就是每个客户端对每个Node最多也就同时发起 5 个未完成的请求; 如果超时这个数量就会等待有请求完成并释放额度了才可以发起新的请求;
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
max.in.flight.requests.per.connection | 每个客户端对每个Node发起请求的最大并发数 | 5 |
KeyValue序列化
将key和Value先序列化。
自定义序列化器,需要实现org.apache.kafka.common.serialization.Serializer
接口。 我们简单看下StringSerializer
序列化器
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
}
configure(Map<String, ?> configs, boolean isKey) 这个方法是在构造 KafkaProduce
实例的时候调用的。isKey
表示是 key还是value来进行序列化 这里 serialize(String topic, String data) 方法直接将字符串转换成byte[]类型。
Kafka客户端提供了很多种序列化器供我们选择,如果这些序列化器你都不满意,你也可以选择其他一些开源的序列化工具,或者自己进行实现。
计算分区号
将序列化后的key、 value 调用合适的分区器选择将要发送的分区号。
将消息缓存进RecordAccumulator累加器中
Sender发送消息
Sender线程在构造KafkaProducer的时候就已经启动了,它的职责就是从
以下忽略部分代码省略
void runOnce() {
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
private long sendProducerData(long now) {
// 获取哪些数据准备好了发送
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
}
寻找准备好发送的消息Batch,获取对应Leader所在的ReadyNode
我们都知道生产者生产的消息是暂时缓存在消息累加器RecordAccumulator中的, Sender负责从RecordAccumulator里面获取准备好的数据进行发送
那么 ,哪些属于准备好的数据呢?
我们先回顾一下 RecordAccumulator的结构。
每个TopicPartition的消息都会被暂存在ProducerBatch Deque 阻塞队列中的其中一个ProducerBatch中,每个ProducerBatch都存放着一条或者多条消息。
满足发送的条件的Batch
遍历每个TopicPartition里面的Deque, 获取队列中的第一个ProducerBatch 如果该TopicPartition不存在Leader,则忽略该Batch,如果有则进入判断流程
因为消息是要发Leader所在的Broker发送的, 所以必须要有Leader。
在满足条件
不属于重试或者属于重试并且等待的时候大于 retry.backoff.ms
的前提下,满足下面条件的均可发送
(该条件就是要排除那些是属于重试,但是还没有到达重试间隔时间的情况。)
该ProducerBatch还没有被发送过. 该Batch能否发送判断条件如下
- 如果该Batch满了或者Batch所在的Deque数量>1(数量大于1说明第一个Batch肯定就满了) 则满足发送条件
- 如果消息累加器中内存用完了,有线程阻塞等待写入消息累加器 则也满足发送条件
-
RecordAccumulator消息累加器被关闭,满足条件;(一般KafkaProducer被正常关闭的时候会先将累加器标记为已经关闭,方便让累加器里面的消息都发出去)
-
是否被强制将消息发送出去。消息累加器RecordAccumulator提供强制
flush()
方法供调用,用于该时刻的消息都满足发送的条件,一般在消息事务的地方有调用。 这里要注意的是,是调用flush()
这一时刻的所有未发送的Batch都需满足发送条件,后面新增的Batch不属于这一范畴 -
该Batch的创建时间>
linger.ms
的时间
获取可发送请求的服务端ReadyNodes
上面是讲哪些Batch属于可发送的逻辑判断,但是实际上,真正发送的时候并不是以每个Batch维度来判断发送的,而是以Node维度来发送的,上面我们知道了哪些Batch能够发送,然后我们就可以推断出Batch对应的TopicPartition所属的Broker。有了这些可发送的Broker,然后再来遍历Broker上的每个TopicPartition中的First Batch
文字不好理解,我们看看下图
上图是生产者的RecordAccumulator消息累加器, 消息累加成上图所示。
每个TopicPartition队列都有很多Batch, 我们知道了TopicPartition 是不是就能够确定它所在的Broker?
例如上图中
-
Topic1Partition-1、 Topic1Partition-2 、Topic2Partition-0 他们三个的Leader都存在于Broker-0 中 虽然 Topic2Partition-0 队列中不满足发送逻辑, 但是跟他同一个Broker中有其他的队列满足条件了,所以它最终也是满足发送条件的。
-
Topic2Partition-1 Leader在Broker-1中,但是它不满足发送条件,这个Broker中也没有其他的满足条件了,所以客户端不会向Broker-1这个Node发起请求。
-
Topic1Partition-3 Leader在Broker-2中,它满足发送条件,那么Broker-2就满足发送条件
那么最终得到的ReadyNodes就是Broker-0、Broker-2
强制更新没有Leader的Topic元信息
上面我们在获取 哪些Batch准备好发送的时候,也会找到哪些TopicPartition没有Leader。
那么这个时候就需要强制的去更新一下这些TopicPartition的元信息了,否则就发送不了。
过滤一些还未准备好连接的ReadyNodes
上面我们已经获取了ReadyNodes
那么在真正的向对应的ReadyNodes 发起请求之前, 我们还是需要判断一下 我们的生产者客户端是否准备好了跟ReadyNodes 发起请求.
那么客户端的准备条件有哪些呢?
生产者客户端在最开始的时候都没有跟任何Node建立连接的, 当我们尝试发送之前会去检验一下连接是否建立成功(就是当前这一步), 如果没有的话,则会去尝试建立连接。并且当前这次是会把这个Node过滤掉的,因为还没有建立成功链接,等到下一次循环的时候,可能已经建立成功了。
当然客户端是否准备好,不仅仅是判断 连接是否建立成功。
还需要判断 当前未完成的请求队列数量是否 < max.in.flight.requests.per.connection
遍历ReadNodes上的所有TopicPartition阻塞队列中的FirstBatch进行打包
到现在为止,我们已经得到了可以发送请求的ReadyNodes了。那么接下来就是分别解析这些ReadyNode 他们能够发送的Batch打包发送了。
这一步最重要的作用是将 ProducerBatch 跟Node映射,也就是知道当前批次想哪个Broker发送哪些Batch
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
// 遍历ReadyNodes 每个Node下的队列都获取一遍
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
那么应该选择哪些Batch来发送呢?
遍历每个ReadyNode节点下面的每个TopicPartition 队列的首个Batch
- 如果FirstBatch 属于重试, 并且还没有达到重试间隔时间
retry.backoff.ms
, 则该TopicPartition队列会忽略 例如上图 Topic3Partition-1) - 如果FirstBatch 为空, 则该TopicPartition队列会忽略;如左边 Topic3Partition-0
- 如果该批次中的总Batch大小 > max.request.size 了. 则会终止此次遍历,并记录当前遍历到的位置, 等下次再次发送的时候从上一次结束的位置进行遍历 (但是这里kafka用了一个全局变量记录当前遍历到的索引,不是每个Broker一个变量, 是一个小Bug)
- 一次Request最多只会完整的遍历一遍, 就算遍历完一遍所有TopicPartition之后还没有写满
max.request.size
. 那么也不会再重新遍历。
构造Produce请求并发起接着处理Response
上面我们已经得到了
Map<Integer, List<ProducerBatch>> batches
也就是Node.id 和对应要发往该Node的Request请求携带的ProducerBatch列表。
发送成功之后,会返回Response,根据Response情况处理不同的逻辑
Response处理逻辑 每个Batch都会对应着一个PartitionResponse, 不同的PartitionResponse对应的不同处理逻辑。
- 如果Response返回RecordTooLargeException异常,并且Batch里面的消息数量>1.这种情况, 就会尝试的去拆分Batch, 如何拆分呢? 是以
batch.size
大小来拆分成多个Batch。并且重新放入到消息累加器中。 - 如果返回是其他异常则先判断一下是否能够重试,如果能够重试,则重新入队到消息累加器中。重新入队的Batch会记录重试次数和时间等等信息。是否能够重试判断逻辑:batch没有超过
delivery.timeout.ms
&& 重启次数<retiries
- 如果是DuplicateSequenceException异常的话,那么并不会做其他的处理,而是当做正常完成。
- 其他异常或者没有异常则会走正常流程, 并且调用InterceptorCallback,如果有Exception也会返回。这个InterceptorCallback里面包含在拦截器interceptors和userCallback(用户自己的回调)。调用顺序如下图:
这个usercallback呢就是我们自己设置的,例如:
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
注意: 这里的回调并不是指的一个Batch一个回调,这里是一个Batch里面有多少条消息,就有多少个回调。每个ProducerBatch里面都有一个对象专门保存所有消息的回调信息 thunks
. 在处理ProducerBatch返回信息的时候会遍历这个trunks, 来执行每个消息的回调。
假如你想确定某个消息是否发送成功, 那么你可以自己定义一个拦截器。 并重写接口onAcknowledgement(RecordMetadata metadata, Exception exception)
在这里面来判断你的消息是否发送成功。
发送流程总结
Kafka Producer 整体架构图
整个生产者客户端是由主线程和Sender线程协调运行的, 主线程创建消息, 然后通过 拦截器、元信息更新、序列化、分区器、缓存消息等等流程。
然后Sender线程在初始化的时候就已经运行了,并且是一个while循环。
Sender线程里面主要工作是:
-
寻找ReadyNodes: 去消息累加器里面获取有哪些Node是能够发送Request的。只要该Node有一个TopicPartition队列中有符合发送条件的Batch。那么这个Node就应该是ReadyNode。具体的筛选逻辑请看上文有具体分析。
-
构建Request: 过滤之后, 拿到了所有的ReadyNodes。接下来就是遍历该Node下所有的TopicPartition队列里面的FirstBatch, 组装到Request请求里面。发往一个Node的请求Request,可以包含多个ProducerBatch,能够一次发送多少个Batch是由配置
max.request.size
决定的,一个Node对应一个Request。 注意: 这个时候映射关系已经是 Map<Node,Reqeust> -
将Request放入inFightRequest中: 上面是组装好了Request, 组装好了之后要先把这个Request放到
inFightRequest
对象中, 它保存着每个Node当前已经发送的Request但是还没有收到Response的请求。每个Node最多能够存放多少个未完成的Request,是由max.in.flight.requests.per.connection
控制的。需要注意的是, 如果队列已经满了, Request是放入不了这个对象里面的,并且会抛出异常:
"Attempt to send a request to node " + nodeId + " which is not ready."
它决定着生产者针对某个Node的并发度。
-
Request通过Selector发起通信.
-
返回Response: 服务端处理完成, 返回Response信息。
-
从inFightRequest中移除完成Request
-
释放内存回消息累加器: 请求结束,清理消息累加器,将发送成功的ProducerBatch占用的内存大小加回到消息累加器中。 注意:这里纯粹的是数字的加减,不涉及内存的处理, 因为发送成功之前的Batch占用了消息累加器的剩余可用内存。发送成之后要加回来。否则消息累加器满了会导致阻塞。**
作者:石臻臻的杂货铺
链接:https://juejin.cn/post/7124563723645091876
来源:稀土掘金
网友评论