美文网首页kafka
Kafka 消费者拉取消息流程(限流)

Kafka 消费者拉取消息流程(限流)

作者: tracy_668 | 来源:发表于2020-12-06 11:57 被阅读0次

    [TOC]

    Kafka 消费者拉取消息流程分析

    我先给大家模拟一下消息拉取的实际现象,这里 max.poll.records = 500。

    1、消息没有堆积时:

    image.png

    可以发现,在消息没有堆积时,消费者拉取时,如果某个分区没有的消息不足 500 条,会从其他分区凑够 500 条后再返回。

    2、多个分区都有堆积时:

    image.png

    在消息有堆积时,可以发现每次返回的都是同一个分区的消息,但经过不断 debug,消费者在拉取过程中并不是等某个分区消费完没有堆积了,再拉取下一个分区的消息,而是不断循环的拉取各个分区的消息,但是这个循环并不是说分区 p0 拉取完 500 条,后面一定会拉取分区 p1 的消息,很有可能后面还会拉取 p0 分区的消息,为了弄明白这种现象,我仔细阅读了相关源码。

    org.apache.kafka.clients.consumer.KafkaConsumer#poll

    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
      try {
        // poll for new data until the timeout expires
        do {
          // 客户端拉取消息核心逻辑
          final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
          if (!records.isEmpty()) {
            //  在返回数据之前, 发送下次的 fetch 请求, 避免用户在下次获取数据时线程阻塞
            if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
              // 调用 ConsumerNetworkClient#poll 方法将 FetchRequest 发送出去。
              client.pollNoWakeup();
            }
            return this.interceptors.onConsume(new ConsumerRecords<>(records));
          }
        } while (timer.notExpired());
        return ConsumerRecords.empty();
      } finally {
        release();
      }
    }
    
    

    我们使用 Kafka consumer 进行消费的时候通常会给一个时间,比如:

    consumer.poll(Duration.ofMillis(3000));
    
    

    从以上代码逻辑可以看出来,用户给定的这个时间,目的是为了等待消息凑够 max.poll.records 条消息后再返回,即使消息条数不够 max.poll.records 消息,时间到了用户给定的等待时间后,也会返回。

    pollForFetches 方法是客户端拉取消息核心逻辑,但并不是真正去 broker 中拉取,而是从缓存中去获取消息。在 pollForFetches 拉取消息后,如果消息不为零,还会调用 fetcher.sendFetches() 与 client.pollNoWakeup(),调用这两个方法究竟有什么用呢?

    fetcher.sendFetches() 经过源码阅读后,得知该方法目的是为了构建拉取请求 FetchRequest 并进行发送,但是这里的发送并不是真正的发送,而是将 FetchRequest 请求对象存放在 unsend 缓存当中,然后会在 ConsumerNetworkClient#poll 方法调用时才会被真正地执行发送。

    fetcher.sendFetches() 在构建 FetchRequest 前,会对当前可拉取分区进行筛选,而这个也是决定多分区拉取消息规律的核心,后面我会讲到。

    pollForFetches 方法会调用 Fetcher#fetchedRecords 方法从缓存中获取并解析消息:

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
      Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
      int recordsRemaining = maxPollRecords;
      try {
        while (recordsRemaining > 0) {
          // 如果当前获取消息的 PartitionRecords 为空,或者已经拉取完毕
          // 则需要从 completedFetches 重新获取 completedFetch 并解析成 PartitionRecords
          if (nextInLineRecords == null || nextInLineRecords.isFetched) {
            // 如果上一个分区缓存中的数据已经拉取完了,直接中断本次循环拉取,并返回空的消息列表
            // 直至有缓存数据为止
            CompletedFetch completedFetch = completedFetches.peek();
            if (completedFetch == null) break;
            try {
              // CompletedFetch 即拉取消息的本地缓存数据
              // 缓存数据中 CompletedFetch 解析成 PartitionRecords
              nextInLineRecords = parseCompletedFetch(completedFetch);
            } catch (Exception e) {
              // ...
            }
            completedFetches.poll();
          } else {
            // 从分区缓存中获取指定条数的消息
            List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
            // ...
            fetched.put(partition, records);
            recordsRemaining -= records.size();
          }
        }
      }
    } catch (KafkaException e) {
      // ...
    }
    return fetched;
    }
    
    

    completedFetches 是拉取到的消息缓存,以上代码逻辑就是围绕着如何从 completedFetches 缓存中获取消息的,从以上代码逻辑可以看出:

    maxPollRecords 为本次拉取的最大消息数量,该值可通过 max.poll.records 参数配置,默认为 500 条,该方法每次从 completedFetches 中取出一个 CompletedFetch 并解析成可以拉取的 PartitionRecords 对象,即方法中的 nextInLineRecords,请注意,PartitionRecords 中的消息数量可能大与 500 条,因此可能本次可能一次性从 PartitionRecords 获取 500 条消息后即返回,如果 PartitionRecords 中消息数量不足 500 条,会从 completedFetches 缓存中取出下一个要拉取的分区消息,recordsRemaining 会记录本次剩余还有多少消息没拉取,通过循环不断地从 completedFetches 缓存中取消息,直至 recordsRemaining 为 0。

    以上代码即可解释为什么消息有堆积的情况下,每次拉取的消息很大概率是同一个分区的消息,因为缓存 CompletedFetch 缓存中的消息很大概率会多余每次拉取消息数量,Kafka 客户端每次从 Broker 拉取的消息数据并不是通过 max.poll.records 决定的,该参数仅决定用户每次从本地缓存中获取多少条数据,真正决定从 Broker 拉取的消息数据量是通过 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等参数决定的。

    我们再想一下,假设某个分区的消息一直都处于堆积状态,Kafka 会每次都拉取这个分区直至将该分区消费完毕吗?(根据假设,Kafka 消费者每次都会从这个分区拉取消息,并将消息存到分区关联的 CompletedFetch 缓存中,根据以上代码逻辑,nextInLineRecords 一直处于还没拉取完的状态,导致每次拉取都会从该分区中拉取消息。)

    答案显然不会,不信你打开 Kafka-manager 观察每个分区的消费进度情况,每个分区都会有消费者在消费中。

    那 Kafka 消费者是如何循环地拉取它监听的分区呢?我们接着往下分析。

    发送拉取请求逻辑:

    org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches

    public synchronized int sendFetches() {
      // 解析本次可拉取的分区
      Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
      for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        final Node fetchTarget = entry.getKey();
        final FetchSessionHandler.FetchRequestData data = entry.getValue();
        // 构建请求对象
        final FetchRequest.Builder request = FetchRequest.Builder
          .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
          .isolationLevel(isolationLevel)
          .setMaxBytes(this.maxBytes)
          .metadata(data.metadata())
          .toForget(data.toForget());
        // 发送请求,但不是真的发送,而是将请求保存在 unsent 中
        client.send(fetchTarget, request)
          .addListener(new RequestFutureListener<ClientResponse>() {
            @Override
            public void onSuccess(ClientResponse resp) {
              synchronized (Fetcher.this) {
    
                // ... ...
    
                // 创建 CompletedFetch, 并缓存到 completedFetches 队列中
                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                                        resp.requestHeader().apiVersion()));
              }
    
            }
          }
                       // ... ...
                       });
      }
      return fetchRequestMap.size();
    }
    
    

    以上代码逻辑很好理解,在发送拉取请求前,先检查哪些分区可拉取,接着为每个分区构建一个 FetchRequest 对象,FetchRequest 中的 minBytes 和 maxBytes,分别可通过 fetch.min.bytes 和 fetch.max.bytes 参数设置。这也是每次从 Broker 中拉取的消息不一定等于 max.poll.records 的原因。

    image.png

    prepareFetchRequests 方法会调用 Fetcher#fetchablePartitions 筛选可拉取的分区,我们来看下 Kafka 消费者是如何进行筛选的:

    org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions

    private List<TopicPartition> fetchablePartitions() {
      Set<TopicPartition> exclude = new HashSet<>();
      List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
      if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
        exclude.add(nextInLineRecords.partition);
      }
      for (CompletedFetch completedFetch : completedFetches) {
        exclude.add(completedFetch.partition);
      }
      fetchable.removeAll(exclude);
      return fetchable;
    }
    
    

    nextInLineRecords 即我们上面提到的根据某个分区缓存 CompletedFetch 解析得到的,如果 nextInLineRecords 中的缓存还没拉取完,则不从 broker 中拉取消息了,以及如果此时 completedFetches 缓存中存在该分区的缓存,也不进行拉取消息。

    我们可以很清楚的得出结论:

    当缓存中还存在中还存在某个分区的消息数据时,消费者不会继续对该分区进行拉取请求,直到该分区的本地缓存被消费完,才会继续发送拉取请求。

    为了更加清晰的表达这段逻辑,我举个例子并将整个流程用图表达出来:

    假设某消费者监听三个分区,每个分区每次从 Broker 中拉取 4 条消息,用户每次从本地缓存中获取 2 条消息:

    image.png

    从以上流程可看出,Kafka 消费者自身已经实现了拉取限流的机制。

    相关文章

      网友评论

        本文标题:Kafka 消费者拉取消息流程(限流)

        本文链接:https://www.haomeiwen.com/subject/ckdowktx.html