一.Sender介绍
根据前面的分析我们知道,主线程通过KafakProducer.send()方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络I/O操作是由Sender线程统一进行的。
Sender线程发送消息的整个流程:
- 用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息。
- 根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点。
- 创建请求,每个Node节点只生成一个请求。
-
调用NetworkClient将请求发送出去。
image.png
Sender实现了Runnable接口,并运行在单独的ioThread中。Sender的run()方法调用了重载的run(long),这才是Sender线程的核心方法,这是发送消息的流程,时序图如下:
Sender处理流程.jpg
/**
* Run a single iteration of sending
*
* @param now
* The current POSIX time in milliseconds
*/
void run(long now) {
//1.从MetaDate获得Kafka集群元信息
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
//2.调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,
//选出可以向哪些Node节点发送信息,返回ReadyCheckResult对象
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 3.如果ReadyCheckResult中标识有unknownLeaderExist,
// 则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
/*
4.针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,
目的是监测网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合中删除。
*/
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
//5.获取待发送的消息集合,把tp和List<RecordBatch>的对应关系转换为node_id和List<RecordBatch>的对应关系
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
//6.调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时消息
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
//7.调用Sender.createProduceRequests()方法将待发送的消息封装成ClientRequest。
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
//8.调用NetworkClient.send()方法,将ClientRequest写入KafkaChannel的send字段。
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
/*
9.调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去。
同时,还会处理服务器发挥的响应,处理超时的请求,调用用户自定义Callback等。
*/
this.client.poll(pollTimeout, now);
}
简述run(long)的处理流程:
- 从MetaDate获得Kafka集群元信息。
- 调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送信息,返回ReadyCheckResult对象。
- 如果ReadyCheckResult中标识有unknownLeaderExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
- 针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,目的是监测网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合中删除。
- 经过步骤4处理的readyNodes集合,调用RecordAccumulator.drain()方法,获取待发送的消息集合。
- 调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时消息。具体逻辑是:遍历RecordAccumulator中保存的全部的RecordBatch,调用RecordBatch.maybeExpire()方法进行处理。如果已经超时,就调用RecordBatch.done()方法,这样会触发自定义的Callback,并将RecordBatch从队列中移除,释放ByteBuffer空间。
- 调用Sender.createProduceRequests()方法将待发送的消息封装成ClientRequest。
- 调用NetworkClient.send()方法,将ClientRequest写入KafkaChannel的send字段。
- 调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,同时,还会处理服务器发挥的响应,处理超时的请求,调用用户自定义Callback等。
二.创建请求:
在org.apache.kafka.common.protocol.Protocol类中罗列了全部请求和响应的格式,请求和响应有不同的版本。首先,介绍生产者向服务端追加消息时使用的请求和响应,分别是Producer_Request_V2和Produce_Response_V2:
Producer_Request_V2请求头和请求体各个字段的介绍:
名称 | 类型 | 含义 |
---|---|---|
api_key | short | API标识 |
api_version | short | API版本号 |
correlation_id | int | 序号,由客户端产生,单调递增,服务端不做任何修改,在Response中会回传给客户端 |
client_id | short | API客户端ID,可为Null |
acks | short | 指定服务端响应此请求之前,需要有多少replica成功复制了此请求的信息。-1表示整个ISR都完成了复制 |
timeout | int | 超时时间,单位是ms |
topic | String | Topic名称 |
partition | int | partition编号 |
record_set | byte数组 | 消息的有效负载 |
Producer_Response_V2各个字段的介绍:
名称 | 类型 | 含义 |
---|---|---|
correlation_id | int | 序号,由客户端产生,单调递增,服务端不做任何修改,在Response中会回传给客户端 |
topic | String | Topic名称 |
partition | int | partition编号 |
error_code | short | 异常编号 |
base_offset | long | 服务端为消息生成的偏移量 |
timestamp | long | 服务端产生的时间戳 |
throttle | int | 延迟时间,单位是ms |
Sender.createProduceRequests()方法的功能是将待发送的消息封装成ClientRequest,不管一个Node对应有多少个RecordBatch,也不管这些RecordBatch发给几个分区,为每个Node仅仅生成一个ClientRequest对象。创建ClientRequest的核心逻辑如下:
- 将一个Node对应的RecordBatch集合,重新整理为produceRecordsByPartition(Map<TopicPartition,ByteBuffer>)和recordsByPartition(Map<TopicPartition,RecordBatch>)两个集合。
- 创建RequestSend,RequestSend是真正通过网络I/O发送的对象,其格式符合上面描述的Producer_Request_V2协议,其中有效负载就是produceRecordsByPartition中的数据。
- 创建RequestCompletionHandler作为回调对象。
- 将RequestSend对象和RequestCompletionHandler对象封装进ClientRequest对象中,并将其返回。
看下代码:
/**
* Transfer the record batches into a list of produce requests on a per-node basis
*/
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
//调用produceRequest()方法,将发往同一个Node的RecordBatch封装成一个ClientRequest对象。
requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
return requests;
}
/**
* Create a produce request from the given record batches
*/
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
//produceRecordsByPartition和recordsByPartition的value不一样,一个是ByteBuffer,另一个是 RecordBatch
Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
//1.将RecordBatch列表按照partition分类,整理成上述两个集合。
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
produceRecordsByPartition.put(tp, batch.records.buffer());
recordsByPartition.put(tp, batch);
}
//2.创建ProduceRequest和RequestSend
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
RequestSend send = new RequestSend(Integer.toString(destination),
this.client.nextRequestHeader(ApiKeys.PRODUCE),
request.toStruct());
//3.创建RequestCompletionHandler作为回调对象,具体逻辑后面详细解释。
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
//创建ClientRequest对象,第二个参数是根据acks配置决定请求是否需要获取响应。
return new ClientRequest(now, acks != 0, send, callback);
}
ProduceRequest的格式和创建过程就分析完成了。创建在后面的流程中,发送的事RequestSend对象,会将ClientRequest放入InFlightRequest中缓存,当请求收到响应或出现异常时,通过缓存的ClientRequest调用其RequestCompletionHandler对象。具体下面介绍。
网友评论