Flink startupMode是如何起作用的

作者: shengjk1 | 来源:发表于2019-04-07 12:20 被阅读5次

    之前一直有个疑问,如果consumer.setStartFromLatest()以及kafkaProperties.put("auto.offset.reset", "earliest")同时存在,究竟哪一个会起作用,答案肯定是consumer.setStartFromLatest(),为什么呢?我们一起来看一下

    @Override
        public void open(Configuration configuration) throws Exception {
            // determine the offset commit mode,区分ON_CHECKPOINTS、DISABLED or KAFKA_PERIODIC,本文主要针对ON_CHECKPOINTS
            this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                    getIsAutoCommitEnabled(),
                    enableCommitOnCheckpoints,
                    ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
    
            // create the kafka partition discoverer
            this.partitionDiscoverer = createPartitionDiscoverer(
                    topicsDescriptor,
                    getRuntimeContext().getIndexOfThisSubtask(),
                    getRuntimeContext().getNumberOfParallelSubtasks());
            this.partitionDiscoverer.open();
    
            subscribedPartitionsToStartOffsets = new HashMap<>();
            //获取fixed topic's or topic pattern 's all partitions
            final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
            //从checkpoint中恢复
            if (restoredState != null) {
                for (KafkaTopicPartition partition : allPartitions) {
                    //新的分区(未曾在checkpoint中的分区将从earliest offset 开始消费),old partition已经从checkpoint中恢复了,并且已经保存在subscribedPartitionsToStartOffsets
                    if (!restoredState.containsKey(partition)) {
                        restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                    }
                }
    
                for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
                    if (!restoredFromOldState) {
                        // seed the partition discoverer with the union state while filtering out
                        // restored partitions that should not be subscribed by this subtask
                        if (KafkaTopicPartitionAssigner.assign(
                            restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
                                == getRuntimeContext().getIndexOfThisSubtask()){
                            subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                        }
                    } else {
                        // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
                        // in this case, just use the restored state as the subscribed partitions
                        subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                    }
                }
    
                if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                    subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
                        if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
                            LOG.warn(
                                "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
                                entry.getKey());
                            return true;
                        }
                        return false;
                    });
                }
    
                LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                    getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
            } else {
                // use the partition discoverer to fetch the initial seed partitions,
                // and set their initial offsets depending on the startup mode.
                // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
                // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
                // when the partition is actually read.
                switch (startupMode) {
                    case SPECIFIC_OFFSETS:
                        if (specificStartupOffsets == null) {
                            throw new IllegalStateException(
                                "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
                                    ", but no specific offsets were specified.");
                        }
    
                        for (KafkaTopicPartition seedPartition : allPartitions) {
                            //指定partition的offset,从指定的offset卡开始,未指定的从group_offset开始
                            Long specificOffset = specificStartupOffsets.get(seedPartition);
                            if (specificOffset != null) {
                                // since the specified offsets represent the next record to read, we subtract
                                // it by one so that the initial state of the consumer will be correct
                                subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
                            } else {
                                // default to group offset behaviour if the user-provided specific offsets
                                // do not contain a value for this partition
                            //对应的startupMode也存储到    subscribedPartitionsToStartOffsets中
    subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                            }
                        }
    
                        break;
                    case TIMESTAMP:
                        if (startupOffsetsTimestamp == null) {
                            throw new IllegalStateException(
                                "Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
                                    ", but no startup timestamp was specified.");
                        }
    
                        for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
                                : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
                            subscribedPartitionsToStartOffsets.put(
                                partitionToOffset.getKey(),
                                (partitionToOffset.getValue() == null)
                                        // if an offset cannot be retrieved for a partition with the given timestamp,
                                        // we default to using the latest offset for the partition
                                        ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                                        // since the specified offsets represent the next record to read, we subtract
                                        // it by one so that the initial state of the consumer will be correct
                                        : partitionToOffset.getValue() - 1);
                        }
    
                        break;
                    default:
                        //默认GROUP_OFFSETS
                        for (KafkaTopicPartition seedPartition : allPartitions) {
                            subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
                        }
                }
    
                if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                    switch (startupMode) {
                        case EARLIEST:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case LATEST:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case TIMESTAMP:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                startupOffsetsTimestamp,
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case SPECIFIC_OFFSETS:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                specificStartupOffsets,
                                subscribedPartitionsToStartOffsets.keySet());
    
                            List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                            for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                                if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                    partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                                }
                            }
    
                            if (partitionsDefaultedToGroupOffsets.size() > 0) {
                                LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                        "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                    getRuntimeContext().getIndexOfThisSubtask(),
                                    partitionsDefaultedToGroupOffsets.size(),
                                    partitionsDefaultedToGroupOffsets);
                            }
                            break;
                        case GROUP_OFFSETS:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                    }
                } else {
                    LOG.info("Consumer subtask {} initially has no partitions to read from.",
                        getRuntimeContext().getIndexOfThisSubtask());
                }
            }
    

    open方法主要是将user指定的topic和对应的partition、offset,存储到Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets中,接下来看flink 消费kafka的入口方法

    @Override
        //入口方法 start a source
        public void run(SourceContext<T> sourceContext) throws Exception {
            if (subscribedPartitionsToStartOffsets == null) {
                throw new Exception("The partitions were not set for the consumer");
            }
    
            // initialize commit metrics and default offset callback method
            this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
            this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
    
            this.offsetCommitCallback = new KafkaCommitCallback() {
                @Override
                public void onSuccess() {
                    successfulCommits.inc();
                }
    
                @Override
                public void onException(Throwable cause) {
                    LOG.warn("Async Kafka commit failed.", cause);
                    failedCommits.inc();
                }
            };
    
            // mark the subtask as temporarily idle if there are no initial seed partitions;
            // once this subtask discovers some partitions and starts collecting records, the subtask's
            // status will automatically be triggered back to be active.
            if (subscribedPartitionsToStartOffsets.isEmpty()) {
                sourceContext.markAsTemporarilyIdle();
            }
    
            // from this point forward:
            //   - 'snapshotState' will draw offsets from the fetcher,
            //     instead of being built from `subscribedPartitionsToStartOffsets`
            //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
            //     Kafka through the fetcher, if configured to do so)
            this.kafkaFetcher = createFetcher(
                    sourceContext,
                    subscribedPartitionsToStartOffsets,
                    periodicWatermarkAssigner,
                    punctuatedWatermarkAssigner,
                    (StreamingRuntimeContext) getRuntimeContext(),
                    offsetCommitMode,
                    getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                    useMetrics);
    
            if (!running) {
                return;
            }
    
            // depending on whether we were restored with the current state version (1.3),
            // remaining logic branches off into 2 paths:
            //  1) New state - partition discovery loop executed as separate thread, with this
            //                 thread running the main fetcher loop
            //  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
            if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
                kafkaFetcher.runFetchLoop();
            } else {
                runWithPartitionDiscovery();
            }
        }
    
    

    createFetcher传入了刚才的subscribedPartitionsToStartOffsets,继续往下走,在创建KafkaFetcher对象的时候,作为构造函数的,最后传到了AbstractFetcher构造器

        protected AbstractFetcher(
                SourceContext<T> sourceContext,
                Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
                SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                ProcessingTimeService processingTimeProvider,
                long autoWatermarkInterval,
                ClassLoader userCodeClassLoader,
                MetricGroup consumerMetricGroup,
                boolean useMetrics) throws Exception {
            this.sourceContext = checkNotNull(sourceContext);
            this.checkpointLock = sourceContext.getCheckpointLock();
            this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
    
            this.useMetrics = useMetrics;
            this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
            this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
            this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);
    
            // figure out what we watermark mode we will be using
            this.watermarksPeriodic = watermarksPeriodic;
            this.watermarksPunctuated = watermarksPunctuated;
    
            if (watermarksPeriodic == null) {
                if (watermarksPunctuated == null) {
                    // simple case, no watermarks involved
                    timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
                } else {
                    timestampWatermarkMode = PUNCTUATED_WATERMARKS;
                }
            } else {
                if (watermarksPunctuated == null) {
                    timestampWatermarkMode = PERIODIC_WATERMARKS;
                } else {
                    throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
                }
            }
    
            this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
    
            // initialize subscribed partition states with seed partitions,根据有无timestamp / watermark 
    //subscribedPartitionStates 持有了List<KafkaTopicPartitionState<KPH>>,KafkaTopicPartitionState包括kafkaTopicPartition offset等信息
            this.subscribedPartitionStates = createPartitionStateHolders(
                    seedPartitionsWithInitialOffsets,
                    timestampWatermarkMode,
                    watermarksPeriodic,
                    watermarksPunctuated,
                    userCodeClassLoader);
    
            // check that all seed partition states have a defined offset
            //无论是从checkpoint中恢复也好,还是从kafkaConsumer.set...设置也好都需要有initial offset
            for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
                if (!partitionState.isOffsetDefined()) {
                    throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
                }
            }
    
            // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
            //到目前为止consumer并未指定partition
            for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
                unassignedPartitionsQueue.add(partition);
            }
    
            // register metrics for the initial seed partitions
            if (useMetrics) {
                registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
            }
    
            // if we have periodic watermarks, kick off the interval scheduler
            if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
                @SuppressWarnings("unchecked")
                PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
                        subscribedPartitionStates,
                        sourceContext,
                        processingTimeProvider,
                        autoWatermarkInterval);
    
                periodicEmitter.start();
            }
        }
    

    然后从AbstractFetch的子类KafkaFetch的构造器我们可以知道,unassignedPartitionsQueue又传递给了KafkaConsumerThread

    public KafkaFetcher(
            SourceFunction.SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,
            ClassLoader userCodeClassLoader,
            String taskNameWithSubtasks,
            KafkaDeserializationSchema<T> deserializer,
            Properties kafkaProperties,
            long pollTimeout,
            MetricGroup subtaskMetricGroup,
            MetricGroup consumerMetricGroup,
            boolean useMetrics) throws Exception {
            super(
                sourceContext,
                assignedPartitionsWithInitialOffsets,
                watermarksPeriodic,
                watermarksPunctuated,
                processingTimeProvider,
                autoWatermarkInterval,
                userCodeClassLoader,
                consumerMetricGroup,
                useMetrics);
    
            this.deserializer = deserializer;
            this.handover = new Handover();
    
            this.consumerThread = new KafkaConsumerThread(
                LOG,
                handover,
                kafkaProperties,
                unassignedPartitionsQueue,
                getFetcherName() + " for " + taskNameWithSubtasks,
                pollTimeout,
                useMetrics,
                consumerMetricGroup,
                subtaskMetricGroup);
        }
    
    

    当KafkaConsumerThread 开始start的时候,也就是KafkaConsumerThread run方法

    ......
    try {
                        //hasAssignedPartitions default false
                        //当发现新的partition的时候,会add到unassignedPartitionsQueue和subscribedPartitionsToStartOffsets
                        if (hasAssignedPartitions) {
                            newPartitions = unassignedPartitionsQueue.pollBatch();
                        }
                        else {
                            // if no assigned partitions block until we get at least one
                            // instead of hot spinning this loop. We rely on a fact that
                            // unassignedPartitionsQueue will be closed on a shutdown, so
                            // we don't block indefinitely
                            newPartitions = unassignedPartitionsQueue.getBatchBlocking();
                        }
    //由于unassignedPartitionsQueue是有数据的,所以newPartitions != null 为true,会执行reassignPartitions方法
                        if (newPartitions != null) {
                            reassignPartitions(newPartitions);
                        }
                    } catch (AbortedReassignmentException e) {
                        continue;
                    }
    ......
    
    void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
           if (newPartitions.size() == 0) {
               return;
           }
           hasAssignedPartitions = true;
           boolean reassignmentStarted = false;
    
           // since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,
           // the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown()
           // until the reassignment is complete.
           final KafkaConsumer<byte[], byte[]> consumerTmp;
           synchronized (consumerReassignmentLock) {
    //将consumer的引用赋值给consumerTmp
               consumerTmp = this.consumer;
               this.consumer = null;
           }
    
           final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
           try {
    /* 之所有会有newPartition和oldPartition是因为当我们配置了KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,每个固定时间会判断是否新加了partition,如果新加了,会将新加的partition添加到unassignedPartitionsQueue中
    */
               for (TopicPartition oldPartition : consumerTmp.assignment()) {
                   oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition));
               }
    
               final List<TopicPartition> newPartitionAssignments =
                   new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());
               newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
               newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));
    
               // reassign with the new partitions
               consumerTmp.assign(newPartitionAssignments);
               reassignmentStarted = true;
    
               // old partitions should be seeked to their previous position
               for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
                   consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
               }
    
               // offsets in the state of new partitions may still be placeholder sentinel values if we are:
               //   (1) starting fresh,
               //   (2) checkpoint / savepoint state we were restored with had not completely
               //       been replaced with actual offset values yet, or
               //   (3) the partition was newly discovered after startup;
               // replace those with actual offsets, according to what the sentinel value represent.
               
               //kafka中配置关于offset的参数是不起作用的,还是依赖于startupMode
    //根据getOffset的类型,consumer指定开始消费的offset,而offset的类型呢,我们知道来源于startupMode
               for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
                   if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
                       consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
                       newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
                   } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
                       consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
                       newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
                   } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                       // the KafkaConsumer by default will automatically seek the consumer position
                       // to the committed group offset, so we do not need to do it.
    
                       newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
                   } else {
                       consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
                   }
               }
           } catch (WakeupException e) {
               // a WakeupException may be thrown if the consumer was invoked wakeup()
               // before it was isolated for the reassignment. In this case, we abort the
               // reassignment and just re-expose the original consumer.
    
               synchronized (consumerReassignmentLock) {
                   this.consumer = consumerTmp;
    
                   // if reassignment had already started and affected the consumer,
                   // we do a full roll back so that it is as if it was left untouched
                   if (reassignmentStarted) {
                       this.consumer.assign(new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));
    
                       for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
                           this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
                       }
                   }
    
                   // no need to restore the wakeup state in this case,
                   // since only the last wakeup call is effective anyways
                   hasBufferedWakeup = false;
    
                   // re-add all new partitions back to the unassigned partitions queue to be picked up again
                   for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
                       unassignedPartitionsQueue.add(newPartition);
                   }
    
                   // this signals the main fetch loop to continue through the loop
                   throw new AbortedReassignmentException();
               }
           }
    
           // reassignment complete; expose the reassigned consumer
           synchronized (consumerReassignmentLock) {
               this.consumer = consumerTmp;
    
               // restore wakeup state for the consumer if necessary
               if (hasBufferedWakeup) {
                   this.consumer.wakeup();
                   hasBufferedWakeup = false;
               }
           }
       }
    
    

    相关文章

      网友评论

        本文标题:Flink startupMode是如何起作用的

        本文链接:https://www.haomeiwen.com/subject/mupbbqtx.html