Flink startupMode是如何起作用的

作者: shengjk1 | 来源:发表于2019-04-07 12:20 被阅读5次

之前一直有个疑问,如果consumer.setStartFromLatest()以及kafkaProperties.put("auto.offset.reset", "earliest")同时存在,究竟哪一个会起作用,答案肯定是consumer.setStartFromLatest(),为什么呢?我们一起来看一下

@Override
    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode,区分ON_CHECKPOINTS、DISABLED or KAFKA_PERIODIC,本文主要针对ON_CHECKPOINTS
        this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        // create the kafka partition discoverer
        this.partitionDiscoverer = createPartitionDiscoverer(
                topicsDescriptor,
                getRuntimeContext().getIndexOfThisSubtask(),
                getRuntimeContext().getNumberOfParallelSubtasks());
        this.partitionDiscoverer.open();

        subscribedPartitionsToStartOffsets = new HashMap<>();
        //获取fixed topic's or topic pattern 's all partitions
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
        //从checkpoint中恢复
        if (restoredState != null) {
            for (KafkaTopicPartition partition : allPartitions) {
                //新的分区(未曾在checkpoint中的分区将从earliest offset 开始消费),old partition已经从checkpoint中恢复了,并且已经保存在subscribedPartitionsToStartOffsets
                if (!restoredState.containsKey(partition)) {
                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                }
            }

            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
                if (!restoredFromOldState) {
                    // seed the partition discoverer with the union state while filtering out
                    // restored partitions that should not be subscribed by this subtask
                    if (KafkaTopicPartitionAssigner.assign(
                        restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
                            == getRuntimeContext().getIndexOfThisSubtask()){
                        subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                    }
                } else {
                    // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
                    // in this case, just use the restored state as the subscribed partitions
                    subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                }
            }

            if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
                    if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
                        LOG.warn(
                            "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
                            entry.getKey());
                        return true;
                    }
                    return false;
                });
            }

            LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
        } else {
            // use the partition discoverer to fetch the initial seed partitions,
            // and set their initial offsets depending on the startup mode.
            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
            // when the partition is actually read.
            switch (startupMode) {
                case SPECIFIC_OFFSETS:
                    if (specificStartupOffsets == null) {
                        throw new IllegalStateException(
                            "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
                                ", but no specific offsets were specified.");
                    }

                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        //指定partition的offset,从指定的offset卡开始,未指定的从group_offset开始
                        Long specificOffset = specificStartupOffsets.get(seedPartition);
                        if (specificOffset != null) {
                            // since the specified offsets represent the next record to read, we subtract
                            // it by one so that the initial state of the consumer will be correct
                            subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
                        } else {
                            // default to group offset behaviour if the user-provided specific offsets
                            // do not contain a value for this partition
                        //对应的startupMode也存储到    subscribedPartitionsToStartOffsets中
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                        }
                    }

                    break;
                case TIMESTAMP:
                    if (startupOffsetsTimestamp == null) {
                        throw new IllegalStateException(
                            "Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
                                ", but no startup timestamp was specified.");
                    }

                    for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
                            : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
                        subscribedPartitionsToStartOffsets.put(
                            partitionToOffset.getKey(),
                            (partitionToOffset.getValue() == null)
                                    // if an offset cannot be retrieved for a partition with the given timestamp,
                                    // we default to using the latest offset for the partition
                                    ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                                    // since the specified offsets represent the next record to read, we subtract
                                    // it by one so that the initial state of the consumer will be correct
                                    : partitionToOffset.getValue() - 1);
                    }

                    break;
                default:
                    //默认GROUP_OFFSETS
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
                    }
            }

            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (startupMode) {
                    case EARLIEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case LATEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case TIMESTAMP:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            startupOffsetsTimestamp,
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case SPECIFIC_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            specificStartupOffsets,
                            subscribedPartitionsToStartOffsets.keySet());

                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                            if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                            }
                        }

                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
                            LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                    "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                partitionsDefaultedToGroupOffsets.size(),
                                partitionsDefaultedToGroupOffsets);
                        }
                        break;
                    case GROUP_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                }
            } else {
                LOG.info("Consumer subtask {} initially has no partitions to read from.",
                    getRuntimeContext().getIndexOfThisSubtask());
            }
        }

open方法主要是将user指定的topic和对应的partition、offset,存储到Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets中,接下来看flink 消费kafka的入口方法

@Override
    //入口方法 start a source
    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

        // initialize commit metrics and default offset callback method
        this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
        this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);

        this.offsetCommitCallback = new KafkaCommitCallback() {
            @Override
            public void onSuccess() {
                successfulCommits.inc();
            }

            @Override
            public void onException(Throwable cause) {
                LOG.warn("Async Kafka commit failed.", cause);
                failedCommits.inc();
            }
        };

        // mark the subtask as temporarily idle if there are no initial seed partitions;
        // once this subtask discovers some partitions and starts collecting records, the subtask's
        // status will automatically be triggered back to be active.
        if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

        // from this point forward:
        //   - 'snapshotState' will draw offsets from the fetcher,
        //     instead of being built from `subscribedPartitionsToStartOffsets`
        //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
        //     Kafka through the fetcher, if configured to do so)
        this.kafkaFetcher = createFetcher(
                sourceContext,
                subscribedPartitionsToStartOffsets,
                periodicWatermarkAssigner,
                punctuatedWatermarkAssigner,
                (StreamingRuntimeContext) getRuntimeContext(),
                offsetCommitMode,
                getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                useMetrics);

        if (!running) {
            return;
        }

        // depending on whether we were restored with the current state version (1.3),
        // remaining logic branches off into 2 paths:
        //  1) New state - partition discovery loop executed as separate thread, with this
        //                 thread running the main fetcher loop
        //  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
        if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
            kafkaFetcher.runFetchLoop();
        } else {
            runWithPartitionDiscovery();
        }
    }

createFetcher传入了刚才的subscribedPartitionsToStartOffsets,继续往下走,在创建KafkaFetcher对象的时候,作为构造函数的,最后传到了AbstractFetcher构造器

    protected AbstractFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,
            ClassLoader userCodeClassLoader,
            MetricGroup consumerMetricGroup,
            boolean useMetrics) throws Exception {
        this.sourceContext = checkNotNull(sourceContext);
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.userCodeClassLoader = checkNotNull(userCodeClassLoader);

        this.useMetrics = useMetrics;
        this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
        this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
        this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);

        // figure out what we watermark mode we will be using
        this.watermarksPeriodic = watermarksPeriodic;
        this.watermarksPunctuated = watermarksPunctuated;

        if (watermarksPeriodic == null) {
            if (watermarksPunctuated == null) {
                // simple case, no watermarks involved
                timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
            } else {
                timestampWatermarkMode = PUNCTUATED_WATERMARKS;
            }
        } else {
            if (watermarksPunctuated == null) {
                timestampWatermarkMode = PERIODIC_WATERMARKS;
            } else {
                throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
            }
        }

        this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();

        // initialize subscribed partition states with seed partitions,根据有无timestamp / watermark 
//subscribedPartitionStates 持有了List<KafkaTopicPartitionState<KPH>>,KafkaTopicPartitionState包括kafkaTopicPartition offset等信息
        this.subscribedPartitionStates = createPartitionStateHolders(
                seedPartitionsWithInitialOffsets,
                timestampWatermarkMode,
                watermarksPeriodic,
                watermarksPunctuated,
                userCodeClassLoader);

        // check that all seed partition states have a defined offset
        //无论是从checkpoint中恢复也好,还是从kafkaConsumer.set...设置也好都需要有initial offset
        for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
            if (!partitionState.isOffsetDefined()) {
                throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
            }
        }

        // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
        //到目前为止consumer并未指定partition
        for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
            unassignedPartitionsQueue.add(partition);
        }

        // register metrics for the initial seed partitions
        if (useMetrics) {
            registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
        }

        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
            @SuppressWarnings("unchecked")
            PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
                    subscribedPartitionStates,
                    sourceContext,
                    processingTimeProvider,
                    autoWatermarkInterval);

            periodicEmitter.start();
        }
    }

然后从AbstractFetch的子类KafkaFetch的构造器我们可以知道,unassignedPartitionsQueue又传递给了KafkaConsumerThread

public KafkaFetcher(
        SourceFunction.SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        ProcessingTimeService processingTimeProvider,
        long autoWatermarkInterval,
        ClassLoader userCodeClassLoader,
        String taskNameWithSubtasks,
        KafkaDeserializationSchema<T> deserializer,
        Properties kafkaProperties,
        long pollTimeout,
        MetricGroup subtaskMetricGroup,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {
        super(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            processingTimeProvider,
            autoWatermarkInterval,
            userCodeClassLoader,
            consumerMetricGroup,
            useMetrics);

        this.deserializer = deserializer;
        this.handover = new Handover();

        this.consumerThread = new KafkaConsumerThread(
            LOG,
            handover,
            kafkaProperties,
            unassignedPartitionsQueue,
            getFetcherName() + " for " + taskNameWithSubtasks,
            pollTimeout,
            useMetrics,
            consumerMetricGroup,
            subtaskMetricGroup);
    }

当KafkaConsumerThread 开始start的时候,也就是KafkaConsumerThread run方法

......
try {
                    //hasAssignedPartitions default false
                    //当发现新的partition的时候,会add到unassignedPartitionsQueue和subscribedPartitionsToStartOffsets
                    if (hasAssignedPartitions) {
                        newPartitions = unassignedPartitionsQueue.pollBatch();
                    }
                    else {
                        // if no assigned partitions block until we get at least one
                        // instead of hot spinning this loop. We rely on a fact that
                        // unassignedPartitionsQueue will be closed on a shutdown, so
                        // we don't block indefinitely
                        newPartitions = unassignedPartitionsQueue.getBatchBlocking();
                    }
//由于unassignedPartitionsQueue是有数据的,所以newPartitions != null 为true,会执行reassignPartitions方法
                    if (newPartitions != null) {
                        reassignPartitions(newPartitions);
                    }
                } catch (AbortedReassignmentException e) {
                    continue;
                }
......
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
       if (newPartitions.size() == 0) {
           return;
       }
       hasAssignedPartitions = true;
       boolean reassignmentStarted = false;

       // since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,
       // the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown()
       // until the reassignment is complete.
       final KafkaConsumer<byte[], byte[]> consumerTmp;
       synchronized (consumerReassignmentLock) {
//将consumer的引用赋值给consumerTmp
           consumerTmp = this.consumer;
           this.consumer = null;
       }

       final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
       try {
/* 之所有会有newPartition和oldPartition是因为当我们配置了KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,每个固定时间会判断是否新加了partition,如果新加了,会将新加的partition添加到unassignedPartitionsQueue中
*/
           for (TopicPartition oldPartition : consumerTmp.assignment()) {
               oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition));
           }

           final List<TopicPartition> newPartitionAssignments =
               new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());
           newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
           newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));

           // reassign with the new partitions
           consumerTmp.assign(newPartitionAssignments);
           reassignmentStarted = true;

           // old partitions should be seeked to their previous position
           for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
               consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
           }

           // offsets in the state of new partitions may still be placeholder sentinel values if we are:
           //   (1) starting fresh,
           //   (2) checkpoint / savepoint state we were restored with had not completely
           //       been replaced with actual offset values yet, or
           //   (3) the partition was newly discovered after startup;
           // replace those with actual offsets, according to what the sentinel value represent.
           
           //kafka中配置关于offset的参数是不起作用的,还是依赖于startupMode
//根据getOffset的类型,consumer指定开始消费的offset,而offset的类型呢,我们知道来源于startupMode
           for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
               if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
                   consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
                   newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
               } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
                   consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
                   newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
               } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                   // the KafkaConsumer by default will automatically seek the consumer position
                   // to the committed group offset, so we do not need to do it.

                   newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
               } else {
                   consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
               }
           }
       } catch (WakeupException e) {
           // a WakeupException may be thrown if the consumer was invoked wakeup()
           // before it was isolated for the reassignment. In this case, we abort the
           // reassignment and just re-expose the original consumer.

           synchronized (consumerReassignmentLock) {
               this.consumer = consumerTmp;

               // if reassignment had already started and affected the consumer,
               // we do a full roll back so that it is as if it was left untouched
               if (reassignmentStarted) {
                   this.consumer.assign(new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));

                   for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
                       this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
                   }
               }

               // no need to restore the wakeup state in this case,
               // since only the last wakeup call is effective anyways
               hasBufferedWakeup = false;

               // re-add all new partitions back to the unassigned partitions queue to be picked up again
               for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
                   unassignedPartitionsQueue.add(newPartition);
               }

               // this signals the main fetch loop to continue through the loop
               throw new AbortedReassignmentException();
           }
       }

       // reassignment complete; expose the reassigned consumer
       synchronized (consumerReassignmentLock) {
           this.consumer = consumerTmp;

           // restore wakeup state for the consumer if necessary
           if (hasBufferedWakeup) {
               this.consumer.wakeup();
               hasBufferedWakeup = false;
           }
       }
   }

相关文章

网友评论

    本文标题:Flink startupMode是如何起作用的

    本文链接:https://www.haomeiwen.com/subject/mupbbqtx.html