美文网首页
Kafka消费者 组件源码 Fetcher

Kafka消费者 组件源码 Fetcher

作者: 不存在的里皮 | 来源:发表于2020-06-29 11:24 被阅读0次

序言

Fetcher是与KafkaConsumer交互的各大组件之一。在各大博客上,比如某csdn博客中提到,Fetcher的作用是:

Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。

Fetcher负责拉取什么消息?如何处理消息?它到底有什么功能,我们需要查阅源码。然而在类声明上的注释只有可怜兮兮的一句话:


所以我们要转换角度去观察。
首先,Fetcher没有继承Runnable或Thread,那么它只是一个API组件,而不是单独运行的线程

然后要观察一个类的作用,可以从两个角度入手:

  1. 与上游组件的交互。也就是它暴露的public方法。
  2. 与下游组件的交互。也就是它是如何调用下游组件的接口的。

与上游组件的交互

与上游组件的交互,就是指它所暴露出的public方法,因为只有public方法能被其它组件调用,这就是它提供的功能。所以我们要研究下这些public方法。
从Idea左侧栏->Structure,点击"Show non-public"按钮,隐藏非公有方法

从方法栏可以看到,Fetcher主要提供了四块功能:

  1. 拉取消息,如红框所示。从fetchedRecords可知,这些方法作用都与从服务器拉取消息有关,能够向服务器发送消息。
  2. 获取topic元数据,如黄框所示。
    • getTopicMetadata用于获取某topic的元数据。以PartitionInfo形式总结。
    • getAllTopicMetadata用于获取集群上所有topic的元数据。以PartitionInfo形式总结。
  3. 获取、刷新offset,如蓝框所示。
    • resetOffsetsIfNeeded会获取offset并刷新。
    • beginningOffsets和endOffsets分别会获取起始/终止的offset。
  4. 监控测量指标相关,如白框所示。并不是主要功能,暂不分析,


与下游组件的交互

查看Fetcher的成员变量可知,Fetcher主要与ConsumerNetworkClient组件交互。后者负责请求、响应的IO[1],那么前者就负责构造请求、处理响应。


搜索client.查看与ConsumerNetworkClient发生交互的地方,总共有8处。

其中client.sendclient.poll(代表发送请求、等待响应的调用。
// ConsumerNetworkClient.java
public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder)

public boolean poll(RequestFuture<?> future, long timeout)

通过对这两种方法的使用,可以向ConsumerNetworkClient发送请求,并添加处理响应的逻辑。有两种

异步响应逻辑

Fetcher利用监听器的机制,添加异步响应的逻辑。
比如sendFetches中,先调用client.send发出请求,再调用addListener添加请求完成后的回调逻辑。

同步响应逻辑

Fetcher调用client.send发出请求,调用client.poll等待请求完成,添加同步响应的逻辑。以getTopicMetadata为例


在sendMetadataRequest内部调用了client.send发送请求

查看poll可知,内部会循环等待,直到请求完成。

拉取消息

sendFetches调用client.send发送请求,通过addListener设置请求完成后的逻辑。在onSuccess中将拉取的数据,按照TopicPartition分别添加到completedFetches

public int sendFetches() {
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        ...
        final FetchRequest.Builder request = FetchRequest.Builder
                .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
                ...
        
        ...

        // 发送请求、设置回调逻辑
        client.send(fetchTarget, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        FetchResponse response = (FetchResponse) resp.responseBody();
                        
                        ...
                        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                            ...
                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                    resp.requestHeader().apiVersion()));  // 添加到completedFetches
                        }

                        ...
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        ...
                    }
                });
    }
    return fetchRequestMap.size();
}
sendFetches在请求完成后,通过OnSuccess执行成功逻辑

外界调用fetchedRecords来收获已经收到的消息。fetchedRecords从completedFetches取出拉取的消息,通过while循环,将消息从CompletedFetch类型转为PartitionRecords,再转为List<ConsumerRecord<K, V>>,添加到fetched中。

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    int recordsRemaining = maxPollRecords;

    try {
            // 通过循环完成拉取到的消息的加工,最多拉取maxPollRecords条消息
        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isFetched) {
                CompletedFetch completedFetch = completedFetches.peek();  // 从completedFetches查看拉取的消息
                if (completedFetch == null) break;  // 没有消息了,退出循环

                nextInLineRecords = parseCompletedFetch(completedFetch);  // 处理成PartitionRecords类型,也就是一个分区上拉到的数据
                completedFetches.poll();  // 去除队头
            } else {
                List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);  // 处理成List<ConsumerRecord<K, V>>类型
                TopicPartition partition = nextInLineRecords.partition;
                // 将拉取到的消息放入fetched
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                    if (currentRecords == null) {
                        fetched.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        fetched.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }
    } catch (KafkaException e) {
        if (fetched.isEmpty())
            throw e;
    }
    return fetched;
}

如图示:


在fetchedRecords的循环中,一条CompletedFetch的变化轨迹

总结

Fetcher向上游提供了拉取消息、获取topic元数据、获取/刷新offset的功能,并由ConsumerNetworkClient完成请求/响应的IO操作。


  1. 可以暂时这么认为,如果读者不放心可查阅资料

相关文章

网友评论

      本文标题:Kafka消费者 组件源码 Fetcher

      本文链接:https://www.haomeiwen.com/subject/futofktx.html