[TOC]
Kafka 消费者拉取消息流程分析
我先给大家模拟一下消息拉取的实际现象,这里 max.poll.records = 500。
1、消息没有堆积时:
image.png可以发现,在消息没有堆积时,消费者拉取时,如果某个分区没有的消息不足 500 条,会从其他分区凑够 500 条后再返回。
2、多个分区都有堆积时:
image.png在消息有堆积时,可以发现每次返回的都是同一个分区的消息,但经过不断 debug,消费者在拉取过程中并不是等某个分区消费完没有堆积了,再拉取下一个分区的消息,而是不断循环的拉取各个分区的消息,但是这个循环并不是说分区 p0 拉取完 500 条,后面一定会拉取分区 p1 的消息,很有可能后面还会拉取 p0 分区的消息,为了弄明白这种现象,我仔细阅读了相关源码。
org.apache.kafka.clients.consumer.KafkaConsumer#poll
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
try {
// poll for new data until the timeout expires
do {
// 客户端拉取消息核心逻辑
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// 在返回数据之前, 发送下次的 fetch 请求, 避免用户在下次获取数据时线程阻塞
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
// 调用 ConsumerNetworkClient#poll 方法将 FetchRequest 发送出去。
client.pollNoWakeup();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}
我们使用 Kafka consumer 进行消费的时候通常会给一个时间,比如:
consumer.poll(Duration.ofMillis(3000));
从以上代码逻辑可以看出来,用户给定的这个时间,目的是为了等待消息凑够 max.poll.records 条消息后再返回,即使消息条数不够 max.poll.records 消息,时间到了用户给定的等待时间后,也会返回。
pollForFetches 方法是客户端拉取消息核心逻辑,但并不是真正去 broker 中拉取,而是从缓存中去获取消息。在 pollForFetches 拉取消息后,如果消息不为零,还会调用 fetcher.sendFetches() 与 client.pollNoWakeup(),调用这两个方法究竟有什么用呢?
fetcher.sendFetches() 经过源码阅读后,得知该方法目的是为了构建拉取请求 FetchRequest 并进行发送,但是这里的发送并不是真正的发送,而是将 FetchRequest 请求对象存放在 unsend 缓存当中,然后会在 ConsumerNetworkClient#poll 方法调用时才会被真正地执行发送。
fetcher.sendFetches() 在构建 FetchRequest 前,会对当前可拉取分区进行筛选,而这个也是决定多分区拉取消息规律的核心,后面我会讲到。
pollForFetches 方法会调用 Fetcher#fetchedRecords 方法从缓存中获取并解析消息:
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
// 如果当前获取消息的 PartitionRecords 为空,或者已经拉取完毕
// 则需要从 completedFetches 重新获取 completedFetch 并解析成 PartitionRecords
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
// 如果上一个分区缓存中的数据已经拉取完了,直接中断本次循环拉取,并返回空的消息列表
// 直至有缓存数据为止
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
try {
// CompletedFetch 即拉取消息的本地缓存数据
// 缓存数据中 CompletedFetch 解析成 PartitionRecords
nextInLineRecords = parseCompletedFetch(completedFetch);
} catch (Exception e) {
// ...
}
completedFetches.poll();
} else {
// 从分区缓存中获取指定条数的消息
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
// ...
fetched.put(partition, records);
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
// ...
}
return fetched;
}
completedFetches 是拉取到的消息缓存,以上代码逻辑就是围绕着如何从 completedFetches 缓存中获取消息的,从以上代码逻辑可以看出:
maxPollRecords 为本次拉取的最大消息数量,该值可通过 max.poll.records 参数配置,默认为 500 条,该方法每次从 completedFetches 中取出一个 CompletedFetch 并解析成可以拉取的 PartitionRecords 对象,即方法中的 nextInLineRecords,请注意,PartitionRecords 中的消息数量可能大与 500 条,因此可能本次可能一次性从 PartitionRecords 获取 500 条消息后即返回,如果 PartitionRecords 中消息数量不足 500 条,会从 completedFetches 缓存中取出下一个要拉取的分区消息,recordsRemaining 会记录本次剩余还有多少消息没拉取,通过循环不断地从 completedFetches 缓存中取消息,直至 recordsRemaining 为 0。
以上代码即可解释为什么消息有堆积的情况下,每次拉取的消息很大概率是同一个分区的消息,因为缓存 CompletedFetch 缓存中的消息很大概率会多余每次拉取消息数量,Kafka 客户端每次从 Broker 拉取的消息数据并不是通过 max.poll.records 决定的,该参数仅决定用户每次从本地缓存中获取多少条数据,真正决定从 Broker 拉取的消息数据量是通过 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等参数决定的。
我们再想一下,假设某个分区的消息一直都处于堆积状态,Kafka 会每次都拉取这个分区直至将该分区消费完毕吗?(根据假设,Kafka 消费者每次都会从这个分区拉取消息,并将消息存到分区关联的 CompletedFetch 缓存中,根据以上代码逻辑,nextInLineRecords 一直处于还没拉取完的状态,导致每次拉取都会从该分区中拉取消息。)
答案显然不会,不信你打开 Kafka-manager 观察每个分区的消费进度情况,每个分区都会有消费者在消费中。
那 Kafka 消费者是如何循环地拉取它监听的分区呢?我们接着往下分析。
发送拉取请求逻辑:
org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches
public synchronized int sendFetches() {
// 解析本次可拉取的分区
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
// 构建请求对象
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget());
// 发送请求,但不是真的发送,而是将请求保存在 unsent 中
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
// ... ...
// 创建 CompletedFetch, 并缓存到 completedFetches 队列中
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
}
}
// ... ...
});
}
return fetchRequestMap.size();
}
以上代码逻辑很好理解,在发送拉取请求前,先检查哪些分区可拉取,接着为每个分区构建一个 FetchRequest 对象,FetchRequest 中的 minBytes 和 maxBytes,分别可通过 fetch.min.bytes 和 fetch.max.bytes 参数设置。这也是每次从 Broker 中拉取的消息不一定等于 max.poll.records 的原因。
image.pngprepareFetchRequests 方法会调用 Fetcher#fetchablePartitions 筛选可拉取的分区,我们来看下 Kafka 消费者是如何进行筛选的:
org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions
private List<TopicPartition> fetchablePartitions() {
Set<TopicPartition> exclude = new HashSet<>();
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
exclude.add(nextInLineRecords.partition);
}
for (CompletedFetch completedFetch : completedFetches) {
exclude.add(completedFetch.partition);
}
fetchable.removeAll(exclude);
return fetchable;
}
nextInLineRecords 即我们上面提到的根据某个分区缓存 CompletedFetch 解析得到的,如果 nextInLineRecords 中的缓存还没拉取完,则不从 broker 中拉取消息了,以及如果此时 completedFetches 缓存中存在该分区的缓存,也不进行拉取消息。
我们可以很清楚的得出结论:
当缓存中还存在中还存在某个分区的消息数据时,消费者不会继续对该分区进行拉取请求,直到该分区的本地缓存被消费完,才会继续发送拉取请求。
为了更加清晰的表达这段逻辑,我举个例子并将整个流程用图表达出来:
假设某消费者监听三个分区,每个分区每次从 Broker 中拉取 4 条消息,用户每次从本地缓存中获取 2 条消息:
image.png从以上流程可看出,Kafka 消费者自身已经实现了拉取限流的机制。
网友评论