美文网首页Flink深度解析
FlinkKafkaConsumer010 代码阅读笔记

FlinkKafkaConsumer010 代码阅读笔记

作者: 袁世超 | 来源:发表于2018-11-01 22:48 被阅读363次

    0.

    FlinkKafkaConsumer010 是 flink 1.6.1 提供的 Kafka 数据源接入实现,在 flink 框架中数据源需要实现 SourceFunction 接口。

    @Public
    public interface SourceFunction<T> extends Function, Serializable {
    
        void run(SourceContext<T> ctx) throws Exception;
    
        void cancel();
    
        @Public
        interface SourceContext<T> {
    
            void collect(T element);
    
            @PublicEvolving
            void collectWithTimestamp(T element, long timestamp);
    
            @PublicEvolving
            void emitWatermark(Watermark mark);
    
            @PublicEvolving
            void markAsTemporarilyIdle();
    
            Object getCheckpointLock();
    
            void close();
        }
    }
    

    SourceFunction

    SourceFunction 声明了两个接口方法:

    1. run:启动一个 source,输出 element 产生数据流;
    2. cancel:取消 source,也就是将 run 方法的执行逻辑中止。

    SourceContext

    flink 通过 SourceContext 提供 element 输出的接口:

    1. collect : 输出一个 element,该 element 的时间戳被自动设置为本地时间;
    2. collectWithTimestamp : 根据用户提供的自定义的时间戳输出一个元素;
    3. emitWatermark : 手动设置一个Watermark。

    至于各种 Time 与 Watermark 之间的关系可以参考官方文档,这里就不赘述了。

    1. FlinkKafkaConsumer010

    首先通过类图对 FlinkKafkaConsumer010 进行整体的认识。

    FlinkKafkaConsumer010类图

    ParallelSourceFunction 是 SourceFunction 的子类,实际上该类是一个标签,用于通知系统该 source 可以并行执行。

    RichFunction 提供了 open 和 close 两个钩子方法,用于开始前和结束后回调执行;另外还提供了 RuntimeContext 的设置获取方法。

    2. FlinkKafkaConsumer010 构建

    先来看 FlinkKafkaConsumer010 的构建逻辑,重点在 FlinkKafkaConsumer09 中:

        private FlinkKafkaConsumer09(
                List<String> topics,
                Pattern subscriptionPattern,
                KeyedDeserializationSchema<T> deserializer,
                Properties props) {
    
            super(
                    topics,
                    subscriptionPattern,
                    deserializer,
                    getLong(
                        checkNotNull(props, "props"),
                        KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
                    !getBoolean(props, KEY_DISABLE_METRICS, false));
    
            this.properties = props;
            setDeserializer(this.properties);
    
            // configure the polling timeout
            try {
                if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                    this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
                } else {
                    this.pollTimeout = DEFAULT_POLL_TIMEOUT;
                }
            }
            catch (Exception e) {
                throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
            }
        }
    
    1. 忽略用户配置的 deserializer,对于官方 KafkaConsumer 的 deserializer 配置为 ByteArrayDeserializer,至于 FlinkKafkaConsumer 层面的反序列化通过 KeyedDeserializationSchema 接口实现。
    2. 如果用户没有 flink.poll-timeout,那么默认 100ms,用于设置 poll kafka 数据的等待时间。

    顺着看父类 FlinkKafkaConsumerBase 的构造方法:

        public FlinkKafkaConsumerBase(
                List<String> topics,
                Pattern topicPattern,
                KeyedDeserializationSchema<T> deserializer,
                long discoveryIntervalMillis,
                boolean useMetrics) {
            this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
            this.deserializer = checkNotNull(deserializer, "valueDeserializer");
    
            checkArgument(
                discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0,
                "Cannot define a negative value for the topic / partition discovery interval.");
            this.discoveryIntervalMillis = discoveryIntervalMillis;
    
            this.useMetrics = useMetrics;
        }
    
    1. KafkaTopicsDescriptor 封装了 topic 的获取逻辑;
    2. discoveryIntervalMillis 设置 partition 自动更新的周期,默认为 Long.MIN_VALUE,也就是不要自动发现。

    3. FlinkKafkaConsumer010 初始化

    初始化的流程主要包含两部分:CheckpointedFunction.initializeState 和RichFunction.open。

    a. CheckpointedFunction.initializeState

    在FlinkKafkaConsumerBase 中:

        public final void initializeState(FunctionInitializationContext context) throws Exception {
    
            OperatorStateStore stateStore = context.getOperatorStateStore();
    
            ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
                stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
    
            this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
                    OFFSETS_STATE_NAME,
                    TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
    
            if (context.isRestored() && !restoredFromOldState) {
                restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
    
                // migrate from 1.2 state, if there is any
                for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
                    restoredFromOldState = true;
                    unionOffsetStates.add(kafkaOffset);
                }
                oldRoundRobinListState.clear();
    
                if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
                    throw new IllegalArgumentException(
                        "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
                }
    
                // populate actual holder for restored state
                for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                    restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
                }
    
                LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
            } else {
                LOG.info("No restore state for FlinkKafkaConsumer.");
            }
        }
    

    从检查点中恢复 offset 到 restoredState 中。

    其中对于 1.2 版本的状态也做了兼容,这里的细节就不看了。

    b. RichFunction.open

    在FlinkKafkaConsumerBase 中:

        public void open(Configuration configuration) throws Exception {
            // determine the offset commit mode
            this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                    getIsAutoCommitEnabled(),
                    enableCommitOnCheckpoints,
                    ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
    
            // create the partition discoverer
            this.partitionDiscoverer = createPartitionDiscoverer(
                    topicsDescriptor,
                    getRuntimeContext().getIndexOfThisSubtask(),
                    getRuntimeContext().getNumberOfParallelSubtasks());
            this.partitionDiscoverer.open();
    
            subscribedPartitionsToStartOffsets = new HashMap<>();
    
            List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
    
            if (restoredState != null) {
                for (KafkaTopicPartition partition : allPartitions) {
                    if (!restoredState.containsKey(partition)) {
                        restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                    }
                }
    
                for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
                    if (!restoredFromOldState) {
                        // seed the partition discoverer with the union state while filtering out
                        // restored partitions that should not be subscribed by this subtask
                        if (KafkaTopicPartitionAssigner.assign(
                            restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
                                == getRuntimeContext().getIndexOfThisSubtask()){
                            subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                        }
                    } else {
                        // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
                        // in this case, just use the restored state as the subscribed partitions
                        subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                    }
                }
    
                LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                    getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
            } else {
                // use the partition discoverer to fetch the initial seed partitions,
                // and set their initial offsets depending on the startup mode.
                // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
                // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
                // when the partition is actually read.
                switch (startupMode) {
                    case SPECIFIC_OFFSETS:
                        if (specificStartupOffsets == null) {
                            throw new IllegalStateException(
                                "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
                                    ", but no specific offsets were specified.");
                        }
    
                        for (KafkaTopicPartition seedPartition : allPartitions) {
                            Long specificOffset = specificStartupOffsets.get(seedPartition);
                            if (specificOffset != null) {
                                // since the specified offsets represent the next record to read, we subtract
                                // it by one so that the initial state of the consumer will be correct
                                subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
                            } else {
                                // default to group offset behaviour if the user-provided specific offsets
                                // do not contain a value for this partition
                                subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                            }
                        }
    
                        break;
                    case TIMESTAMP:
                        if (startupOffsetsTimestamp == null) {
                            throw new IllegalStateException(
                                "Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
                                    ", but no startup timestamp was specified.");
                        }
    
                        for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
                                : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
                            subscribedPartitionsToStartOffsets.put(
                                partitionToOffset.getKey(),
                                (partitionToOffset.getValue() == null)
                                        // if an offset cannot be retrieved for a partition with the given timestamp,
                                        // we default to using the latest offset for the partition
                                        ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                                        // since the specified offsets represent the next record to read, we subtract
                                        // it by one so that the initial state of the consumer will be correct
                                        : partitionToOffset.getValue() - 1);
                        }
    
                        break;
                    default:
                        for (KafkaTopicPartition seedPartition : allPartitions) {
                            subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
                        }
                }
    
                if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                    switch (startupMode) {
                        case EARLIEST:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case LATEST:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case TIMESTAMP:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                startupOffsetsTimestamp,
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case SPECIFIC_OFFSETS:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                specificStartupOffsets,
                                subscribedPartitionsToStartOffsets.keySet());
    
                            List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                            for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                                if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                    partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                                }
                            }
    
                            if (partitionsDefaultedToGroupOffsets.size() > 0) {
                                LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                        "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                    getRuntimeContext().getIndexOfThisSubtask(),
                                    partitionsDefaultedToGroupOffsets.size(),
                                    partitionsDefaultedToGroupOffsets);
                            }
                            break;
                        default:
                        case GROUP_OFFSETS:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                    }
                } else {
                    LOG.info("Consumer subtask {} initially has no partitions to read from.",
                        getRuntimeContext().getIndexOfThisSubtask());
                }
            }
        }
    

    OffsetCommitMode

    1. 如果 getRuntimeContext().isCheckpointingEnabled() 并且 enableCommitOnCheckpoints 那么设置为 ON_CHECKPOINTS,也就是当 flink 的检查点完成后提交 offset;
    2. 如果 isCheckpointingEnabled 为 false,而 enableAutoCommit 为 true,那么设置为 KAFKA_PERIODIC,也就是使用官方 KafkaConsumer 的周期性自动提交 offset 的机制;
    3. 否则设置为 DISABLED,也就是关闭 offset 提交。

    Kafka010PartitionDiscoverer

    1. 根据 KafkaTopicsDescriptor 调用 Kafka 的接口获取需要消费的 TopicPartition;
    2. 根据 numParallelSubtasks 和 indexOfThisSubtask 选择当前 task 需要消费的 TopicPartition。
        public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
            if (isUndiscoveredPartition(partition)) {
                discoveredPartitions.add(partition);
    
                return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
            }
    
            return false;
        }
    

    回想一下,这个是 ParallelSourceFunction,也就是说会有多个实例并行执行,所以不同的实例需要分配一下 TopicPartition。

    StartOffsets

    如果从检查点恢复了状态,那么依据检查点的内容设置 subscribedPartitionsToStartOffsets,对于新增的 partition 设置为 EARLIEST_OFFSET;

    如果没有从检查点恢复状态,那么根据 startupMode 设置 subscribedPartitionsToStartOffsets。

    startupMode 默认为 GROUP_OFFSETS,也就是依据 Kafka 消费组提交的 offset 继续消费。

    4. FlinkKafkaConsumer010 执行

    执行逻辑的接口方法是 SourceFunction 的 run。

    具体实现在 FlinkKafkaConsumerBase 中:

        public void run(SourceContext<T> sourceContext) throws Exception {
            if (subscribedPartitionsToStartOffsets == null) {
                throw new Exception("The partitions were not set for the consumer");
            }
    
            // initialize commit metrics and default offset callback method
            this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
            this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
    
            this.offsetCommitCallback = new KafkaCommitCallback() {
                @Override
                public void onSuccess() {
                    successfulCommits.inc();
                }
    
                @Override
                public void onException(Throwable cause) {
                    LOG.warn("Async Kafka commit failed.", cause);
                    failedCommits.inc();
                }
            };
    
            // mark the subtask as temporarily idle if there are no initial seed partitions;
            // once this subtask discovers some partitions and starts collecting records, the subtask's
            // status will automatically be triggered back to be active.
            if (subscribedPartitionsToStartOffsets.isEmpty()) {
                sourceContext.markAsTemporarilyIdle();
            }
    
            // from this point forward:
            //   - 'snapshotState' will draw offsets from the fetcher,
            //     instead of being built from `subscribedPartitionsToStartOffsets`
            //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
            //     Kafka through the fetcher, if configured to do so)
            this.kafkaFetcher = createFetcher(
                    sourceContext,
                    subscribedPartitionsToStartOffsets,
                    periodicWatermarkAssigner,
                    punctuatedWatermarkAssigner,
                    (StreamingRuntimeContext) getRuntimeContext(),
                    offsetCommitMode,
                    getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                    useMetrics);
    
            if (!running) {
                return;
            }
    
            // depending on whether we were restored with the current state version (1.3),
            // remaining logic branches off into 2 paths:
            //  1) New state - partition discovery loop executed as separate thread, with this
            //                 thread running the main fetcher loop
            //  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
    
            if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
                final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
                this.discoveryLoopThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // --------------------- partition discovery loop ---------------------
    
                            List<KafkaTopicPartition> discoveredPartitions;
    
                            // throughout the loop, we always eagerly check if we are still running before
                            // performing the next operation, so that we can escape the loop as soon as possible
    
                            while (running) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
                                }
    
                                try {
                                    discoveredPartitions = partitionDiscoverer.discoverPartitions();
                                } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
                                    // the partition discoverer may have been closed or woken up before or during the discovery;
                                    // this would only happen if the consumer was canceled; simply escape the loop
                                    break;
                                }
    
                                // no need to add the discovered partitions if we were closed during the meantime
                                if (running && !discoveredPartitions.isEmpty()) {
                                    kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                                }
    
                                // do not waste any time sleeping if we're not running anymore
                                if (running && discoveryIntervalMillis != 0) {
                                    try {
                                        Thread.sleep(discoveryIntervalMillis);
                                    } catch (InterruptedException iex) {
                                        // may be interrupted if the consumer was canceled midway; simply escape the loop
                                        break;
                                    }
                                }
                            }
                        } catch (Exception e) {
                            discoveryLoopErrorRef.set(e);
                        } finally {
                            // calling cancel will also let the fetcher loop escape
                            // (if not running, cancel() was already called)
                            if (running) {
                                cancel();
                            }
                        }
                    }
                }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
    
                discoveryLoopThread.start();
                kafkaFetcher.runFetchLoop();
    
                // --------------------------------------------------------------------
    
                // make sure that the partition discoverer is properly closed
                partitionDiscoverer.close();
                discoveryLoopThread.join();
    
                // rethrow any fetcher errors
                final Exception discoveryLoopError = discoveryLoopErrorRef.get();
                if (discoveryLoopError != null) {
                    throw new RuntimeException(discoveryLoopError);
                }
            } else {
                // won't be using the discoverer
                partitionDiscoverer.close();
    
                kafkaFetcher.runFetchLoop();
            }
        }
    

    a. Kafka010Fetcher 构建

    通过抽象方法 createFetcher 方法创建 AbstractFetcher,具体的实现在子类 FlinkKafkaConsumer010 中。

    关键有三个模块:unassignedPartitionsQueue、Handover 和 consumerThread。

    unassignedPartitionsQueue

    这是一个 ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> 队列,当初始化时会把需要消费的 TopicPartition 加入这个队列;如果启动了 TopicPartition 周期性自动发现,那么后续新发现 TopicPartition 也会加入这个队列。

    Handover

    可以理解为一个长度为一的阻塞队列,将 consumerThread 获取的消息或者抛出的异常,传递给 flink 执行的线程。

    KafkaConsumerThread

    1. 封装了 Kafka 消费的逻辑,另外依靠 unassignedPartitionsQueue,可以动态添加新的 TopicPartition。
    2. 封装了 offset 提交的逻辑,如果提交策略是 OffsetCommitMode.ON_CHECKPOINTS,那么利用 CheckpointListener 的回调执行 offset 提交,其中线程间通信使用了 nextOffsetsToCommit 这个数据结构。

    b. Kafka010Fetcher 执行

    执行 runFetchLoop 方法

        public void runFetchLoop() throws Exception {
            try {
                final Handover handover = this.handover;
    
                // kick off the actual Kafka consumer
                consumerThread.start();
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the consumer thread
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                                records.records(partition.getKafkaPartitionHandle());
    
                        for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
                            final T value = deserializer.deserialize(
                                    record.key(), record.value(),
                                    record.topic(), record.partition(), record.offset());
    
                            if (deserializer.isEndOfStream(value)) {
                                // end of stream signaled
                                running = false;
                                break;
                            }
    
                            // emit the actual record. this also updates offset state atomically
                            // and deals with timestamps and watermark generation
                            emitRecord(value, partition, record.offset(), record);
                        }
                    }
                }
            }
            finally {
                // this signals the consumer thread that no more work is to be done
                consumerThread.shutdown();
            }
    
            // on a clean exit, wait for the runner thread
            try {
                consumerThread.join();
            }
            catch (InterruptedException e) {
                // may be the result of a wake-up interruption after an exception.
                // we ignore this here and only restore the interruption state
                Thread.currentThread().interrupt();
            }
        }
    

    从 handover 中获取 ConsumerRecords,经过 deserializer 反序列化后 emit 出去。

    相关文章

      网友评论

        本文标题:FlinkKafkaConsumer010 代码阅读笔记

        本文链接:https://www.haomeiwen.com/subject/jfxitqtx.html