美文网首页
[Kafka] KafkaConsumer的poll分析2之获取

[Kafka] KafkaConsumer的poll分析2之获取

作者: LZhan | 来源:发表于2019-12-31 14:18 被阅读0次

1 消费方式与订阅状态

1.1 消费方式

分为订阅(subscribe)和 分配(assign)

subscribe:指定订阅的主题,由协调者(ConsumerCoordinator和GroupCoordinator)为消费者分配分区

assign:消费者指定消费的分区,这种模式失去协调者为消费者动态分配分区的功能

1.1.1

相关代码如下:

image

subscribe方法的参数是Topic,而assign方法的参数是Partition,这两个方法都会更新消费者订阅状态对象(SubscriptionState)。

subscribe方法中 this.subscriptions.subscribe(new HashSet<>(topics), listener);

assign方法中this.subscriptions.assignFromUser(new HashSet<>(partitions));

获取分配结果方法assignment中,保存了分配给消费者的分区到分区状态映射关系

1.2 订阅状态

public class SubscriptionState {
    private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
            "Subscription to topics, partitions and pattern are mutually exclusive";
    //状态有哪些
    private enum SubscriptionType {
        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
    }

    private SubscriptionType subscriptionType;

    private Pattern subscribedPattern;

    //用户订阅的主题
    private Set<String> subscription;

    /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
    //消费组订阅的主题
    private final Set<String> groupSubscription;

    /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
    private final PartitionStates<TopicPartitionState> assignment;

    /* Default offset reset strategy */
    private final OffsetResetStrategy defaultResetStrategy;

    /* Listeners provide a hook for internal state cleanup (e.g. metrics) on assignment changes */
    private final List<Listener> listeners = new ArrayList<>();

    /* User-provided listener to be invoked when assignment changes */
    private ConsumerRebalanceListener rebalanceListener;
    
}

subscribe和assign是如何改变SubscriptionState的??

subscribe

   public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        // 设置类型为auto_topics
        setSubscriptionType(SubscriptionType.AUTO_TOPICS);

        this.rebalanceListener = listener;
        
        changeSubscription(topics);
    }
    
        private void changeSubscription(Set<String> topicsToSubscribe) {
        // 判断每一次订阅有没有改变,没有改变就不需要为消费者重新分配分区
        if (!this.subscription.equals(topicsToSubscribe)) {
            this.subscription = topicsToSubscribe;
            this.groupSubscription.addAll(topicsToSubscribe);
        }
    }

assign

    public void assignFromUser(Set<TopicPartition> partitions) {
        // 设置类型为user_assigned
        setSubscriptionType(SubscriptionType.USER_ASSIGNED);
        // 分配的分区有没有改变
        if (!this.assignment.partitionSet().equals(partitions)) {
            fireOnAssignment(partitions);

            Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
            for (TopicPartition partition : partitions) {
                TopicPartitionState state = assignment.stateValue(partition);
                if (state == null)
                    state = new TopicPartitionState();
                partitionToState.put(partition, state);
            }
            this.assignment.set(partitionToState);
        }
    }

在SubscriptionState中还有通过分配结果assignment获取“部分或所有”的方法,

hasAllFetchPositions():判断是否所有的分区都存在有效的拉取偏移量

image

missingFetchPositions():并不是所有分区都有拉取偏移量,找出没有拉取偏移量的分区

image

fetchablePartitions():获取允许拉取,即存在拉取偏移量的分区,用来构建拉取请求

image image

1.3 分区状态

旧版TopicPartitionState

private static class TopicPartitionState{
    private Long position; // 拉取偏移量
    private OffsetAndMetadata committed; //消费偏移量,提交偏移量
    private boolean paused; //分区是否被暂停拉取
    private OffsetResetStrategy resetStrategy; //重置策略
}

新版TopicPartitionState

 private Long position; // last consumed position 拉取偏移量
        private Long highWatermark; // the high watermark from last fetch  高水位,消费者只能拉取到这个offset之前的消息
        private Long logStartOffset; // the log start offset 日志文件起始处
        private Long lastStableOffset; // 用于事务处理,当前已经提交的事务的最大位点
        private boolean paused;  // whether this partition has been paused by the user 分区是否被暂停拉取
        private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
        private Long nextAllowedRetryTimeMs;
}

新版将OffsetAndMetadata从TopicPartitionState中移出来

public class OffsetAndMetadata implements Serializable {
    private static final long serialVersionUID = 2019555404968089681L;

    private final long offset;
    private final String metadata;

    // We use null to represent the absence of a leader epoch to simplify serialization.
    // I.e., older serializations of this class which do not have this field will automatically
    // initialize its value to null.
    private final Integer leaderEpoch;
}

todo 分析

2.消费者拉取消息前,如何获取各分区的拉取偏移量

分为第一次轮询和多次轮询

(1)客户端订阅主题后通过KafkaConsumer轮询,准备拉取消息

(2)如果所有的分区都有拉取偏移量,进入步骤6,否则进入步骤3

(3)从订阅状态的分配结果中找出所有没有拉取偏移量的分区

(4)通过updateFetchPositions()更新步骤3中分区的拉取偏移量

(5)不管是从步骤2直接进来还是步骤4更新过的分区,现在都允许消费者拉取

(6)对所有存在拉取偏移量并且允许拉取的分区,构建拉取请求开始拉取消息

image

2.1 关于偏移量

调用KafkaConsumer的更新拉取偏移量方法有以下两个步骤:

<1> 通过“消费者的协调者”(ConsumerCoordinator)更新分区状态的提交偏移量(OffsetAndMetadata)

<2> 通过“拉取器”(Fetcher)更新分区状态的拉取偏移量(TopicPartitionState的position)

拉取偏移量:用于在发送拉取请求时指定从分区的的哪里开始拉取消息

提交偏移量:表示消费者处理分区消息的进度

消费者拉取消息时要更新拉取偏移量,处理消息时需要更新提交偏移量。
通常“提交偏移量”会赋值给“拉取偏移量”,尤其是发生再平衡时,分区分配给新的消费者。新消费者之前在本地没有记录这个分区的消费进度,它要获取“拉取偏移量”,需要从协调者获取这个分区的“提交偏移量”,将提交偏移量作为分区的起始“拉取偏移量”

获取分区偏移量:1.OFFSET_FETCH请求(OffsetFetchRequest)是通过消费者的协调者发送给服务端协调节点,2.LIST_OFFSET请求(ListOffsetRequest)则是由拉取器发送给分区的主副本节点。协调节点保存了消费组的相关数据,即分区的提交偏移量;而分区主副本节点只保存分区的日志文件,和偏移量相关的只有日志文件中的消息偏移量。 在新API获取拉取偏移量时综合使用了这两个功能:只有协调节点没有记录分区的提交偏移量时,才会从分区的主副本节点获取偏移量。

2.2 流程梳理

在上图中【updateFetchPositions】中,

<1> 先去调用【hasAllFetchPositions】,从分配结果assignment(这里的assignment是在调用ConsumerCoordinator.poll进行分配的)中,判断所有的分区是否都有“拉取偏移量”,都有的话直接返回true

<2> 如果存在某些分区没有“拉取偏移量”,调用ConsumerCoordinator的【refreshCommittedOffsetsIfNeeded】,在该方法中

(1) 调用【missingFetchPositions】,找出没有拉取偏移量的分区

(2) 调用【fetchCommittedOffsets】,实际是通过coordinator去获取分区状态的“提交偏移量”

(3) 更新每个分区的OffsetAndMetadata的leaderEpoch,因为分区主副本可能已经变更了

(4) 将“提交偏移量”赋值给“拉取偏移量”
特别提醒:通过assign方式订阅分区的,通过给分区指定初始position,避免coordinator的参与

<3> 如果2中,某些分区还是没有position,即服务端的协调者节点并没有记录这个分区的“已提交偏移量”,调用【resetMissingPositions】,使用定义的重置策略来设置重置策略,并将position置为null,将nextAllowedRetryTimeMs置为null

<4> 调用fetcher的【resetOffsetsIfNeeded】,获取需要重置的partitions,使用partitionsNeedingReset方法(满足两个条件就是resetStrategy != null并且nextAllowedRetryTimeMs == null || nowMs >= nextAllowedRetryTimeMs), 调用【resetOffsetsAsync】方法

<5> 在【resetOffsetsAsync】中,使用groupListOffsetRequests获取Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>>请求类,这里的Node是分区的主副本所在的节点,

image

PartitionData构造:

image

传入的timstamp是之前从重置策略得到的,currentLeaderEpoch传入的是空。

接着根据Node异步发送请求【sendListOffsetRequest】,返回结果ListOffsetResult

 private static class ListOffsetResult {
        private final Map<TopicPartition, OffsetData> fetchedOffsets; // 主题分区和偏移量结果的map集合
        private final Set<TopicPartition> partitionsToRetry; // 主题分区的set集合
}

2.3 源码分析

2.3.1 hasAllFetchPositions方法

    public boolean hasAllFetchPositions() {
        return assignment.stream().allMatch(state -> state.value().hasValidPosition());
    }

从ConsumerCoordinator的poll方法,GroupCoordinator返回的分区分配结果中,进行遍历,查看哪些分区是由合法的“拉取偏移量”即TopiPartitionState中的position。

2.3.2 refreshCommittedOffsetsIfNeeded方法

    public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
        // 查找哪些分区是没有合法的position
        final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();

        final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timer);
        if (offsets == null) return false;
        // 获取到提交偏移量之后,将分区的提交偏移量赋值给分区的拉取偏移量
        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            final TopicPartition tp = entry.getKey();
            final long offset = entry.getValue().offset();
            log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
            // 更新分区的leader epoch
            entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
            // 将offset赋值给分区的拉取偏移量position
            this.subscriptions.seek(tp, offset);
        }
        return true;
    }

2.3.3 resetMissingPositions方法

    public void resetMissingPositions() {
        final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
        // 遍历分配的分区
        assignment.stream().forEach(state -> {
            TopicPartition tp = state.topicPartition();
            TopicPartitionState partitionState = state.value();
            // 如果该分区还是没有拉取偏移量position
            if (partitionState.isMissingPosition()) {
                // 如果还没有设置重置策略
                if (defaultResetStrategy == OffsetResetStrategy.NONE)
                    partitionsWithNoOffsets.add(tp);
                else
                // 设置重置策略,清空position,将nextAllowedRetryTimeMs设置为null
                    partitionState.reset(defaultResetStrategy);
            }
        });
        // 如果存在分区没有position并且没有配置重置策略,抛出异常
        if (!partitionsWithNoOffsets.isEmpty())
            throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
    }

Offset重置策略如下,对应很熟悉的属性配置auto.offset.reset

earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

public enum OffsetResetStrategy {
    LATEST, EARLIEST, NONE
}

2.3.4 resetOffsetsIfNeeded方法

    public void resetOffsetsIfNeeded() {
        // Raise exception from previous offset fetch if there is one
        RuntimeException exception = cachedListOffsetsException.getAndSet(null);
        if (exception != null)
            throw exception;
        // 获取需要重置position的分区
        Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
        if (partitions.isEmpty())
            return;

        final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
        for (final TopicPartition partition : partitions) {
            // 根据重置策略 auto.offset.reset 获取时间戳
            Long timestamp = offsetResetStrategyTimestamp(partition);
            if (timestamp != null)
                // 存放的是分区和时间戳(重置策略:最早的时间戳和最晚的时间戳)
                offsetResetTimestamps.put(partition, timestamp);
        }
        // 异步提交重置请求
        resetOffsetsAsync(offsetResetTimestamps);
    }

2.3.5 resetOffsetsAsync方法

    // 同步重置每个分区的拉取偏移量
    private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
        // Add the topics to the metadata to do a single metadata fetch.
        for (TopicPartition tp : partitionResetTimestamps.keySet())
            metadata.add(tp.topic());

        // 调用groupListOffsetRequests方法
        Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode =
                groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());

        for (Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) {
            Node node = entry.getKey();
            final Map<TopicPartition, ListOffsetRequest.PartitionData> resetTimestamps = entry.getValue();

            subscriptions.setResetPending(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);

            RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
            future.addListener(new RequestFutureListener<ListOffsetResult>() {
                @Override
                public void onSuccess(ListOffsetResult result) {
                    if (!result.partitionsToRetry.isEmpty()) {
                        subscriptions.resetFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs);
                        metadata.requestUpdate();
                    }

                    for (Map.Entry<TopicPartition, OffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
                        TopicPartition partition = fetchedOffset.getKey();
                        OffsetData offsetData = fetchedOffset.getValue();
                        ListOffsetRequest.PartitionData requestedReset = resetTimestamps.get(partition);
                        resetOffsetIfNeeded(partition, requestedReset.timestamp, offsetData);
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    subscriptions.resetFailed(resetTimestamps.keySet(), time.milliseconds() + retryBackoffMs);
                    metadata.requestUpdate();

                    if (!(e instanceof RetriableException) && !cachedListOffsetsException.compareAndSet(null, e))
                        log.error("Discarding error in ListOffsetResponse because another error is pending", e);
                }
            });
        }
    }

2.3.6 groupListOffsetRequests方法

    private Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> groupListOffsetRequests(
            Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> partitionsToRetry) {
        final Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode = new HashMap<>();

        for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
            // 获取分区
            TopicPartition tp  = entry.getKey();
            // 获取分区详细信息
            Optional<PartitionInfo> currentInfo = metadata.partitionInfoIfCurrent(tp);
            // 分区信息不存在
            if (!currentInfo.isPresent()) {
                metadata.add(tp.topic());
                log.debug("Leader for partition {} is unknown for fetching offset", tp);
                metadata.requestUpdate();
                partitionsToRetry.add(tp);
            } else if (currentInfo.get().leader() == null) {
                log.debug("Leader for partition {} is unavailable for fetching offset", tp);
                metadata.requestUpdate();
                partitionsToRetry.add(tp);
            } else if (client.isUnavailable(currentInfo.get().leader())) {
                client.maybeThrowAuthFailure(currentInfo.get().leader());

                // The connection has failed and we need to await the blackout period before we can
                // try again. No need to request a metadata update since the disconnect will have
                // done so already.
                log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
                        currentInfo.get().leader(), tp);
                partitionsToRetry.add(tp);
            } else {
                // 这里的Node是分区的主副本所在的节点
                Node node = currentInfo.get().leader();
                Map<TopicPartition, ListOffsetRequest.PartitionData> topicData =
                        timestampsToSearchByNode.computeIfAbsent(node, n -> new HashMap<>());
                ListOffsetRequest.PartitionData partitionData = new ListOffsetRequest.PartitionData(
                        entry.getValue(), Optional.empty());
                topicData.put(entry.getKey(), partitionData);
            }
        }
        return timestampsToSearchByNode;
    }

相关文章

网友评论

      本文标题:[Kafka] KafkaConsumer的poll分析2之获取

      本文链接:https://www.haomeiwen.com/subject/lssmoctx.html