美文网首页玩转大数据Java
Flink 源码之 KafkaSource

Flink 源码之 KafkaSource

作者: AlienPaul | 来源:发表于2022-09-20 15:22 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    前言

    FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation提出了新的Source架构。该新架构的分析请参见Flink 源码之新 Source 架构。针对这个新架构,Flink社区新推出了新的Kafka connector - KafkaSource。老版本的实现FlinkKafkaConsumer目前被标记为Deprecated,不再推荐使用。本篇展开KafkaSource的源代码分析。

    本篇包含4个部分的源代码分析:

    • KafkaSource创建
    • 数据读取
    • 分区发现
    • checkpoint

    KafkaSource创建

    如官网所示,编写Flink消费Kafka场景应用,我们可以按照如下方式创建KafkaSource:

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    

    env.fromSource生成了一个DataStreamSourceDataStreamSource对应了SourceTransformation,然后经过SourceTransformationTranslator翻译成StreamGraphSource节点,执行的时候对应的是SourceOperatorSourceOperator是新Source API对应的Operator。它直接和SourceReader交互。调用sourceReader.pollNext方法拉取数据。这一连串逻辑与Kafka关系不大,不再展开介绍,了解即可。

    最终,KafkaSourceBuilder按照我们配置的参数,返回符合要求的kafkaSource对象。

    public KafkaSource<OUT> build() {
        sanityCheck();
        parseAndSetRequiredProperties();
        return new KafkaSource<>(
                subscriber,
                startingOffsetsInitializer,
                stoppingOffsetsInitializer,
                boundedness,
                deserializationSchema,
                props);
    }
    

    KafkaSourcecreateReader方法生成KafkaSourceReader。代码如下:

    @Internal
    @Override
    public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        return createReader(readerContext, (ignore) -> {});
    }
    
    @VisibleForTesting
    SourceReader<OUT, KafkaPartitionSplit> createReader(
            SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
            throws Exception {
        // elementQueue用来存放从fetcher获取到的ConsumerRecord
        // reader从elementQueue读取缓存的Kafka消息
        FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
                elementsQueue = new FutureCompletingBlockingQueue<>();
        // 初始化deserializationSchema
        deserializationSchema.open(
                new DeserializationSchema.InitializationContext() {
                    @Override
                    public MetricGroup getMetricGroup() {
                        return readerContext.metricGroup().addGroup("deserializer");
                    }
    
                    @Override
                    public UserCodeClassLoader getUserCodeClassLoader() {
                        return readerContext.getUserCodeClassLoader();
                    }
                });
        // 创建Kafka数据源监控
        final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
                new KafkaSourceReaderMetrics(readerContext.metricGroup());
    
        // 创建一个工厂方法,用于创建KafkaPartitionSplitReader。它按照分区读取Kafka消息
        Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
                () -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
        KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
    
        return new KafkaSourceReader<>(
                elementsQueue,
                new KafkaSourceFetcherManager(
                        elementsQueue, splitReaderSupplier::get, splitFinishedHook),
                recordEmitter,
                toConfiguration(props),
                readerContext,
                kafkaSourceReaderMetrics);
    }
    
    

    数据读取流程

    KafkaSourceFetcherManager继承了SingleThreadFetcherManager。当发现数据分片的时候,它会获取已有的SplitFetcher,将split指派给它。如果没有正在运行的fetcher,创建一个新的。

    @Override
    // 发现新的分片的时候调用这个方法
    // 将分片指派给fetcher
    public void addSplits(List<SplitT> splitsToAdd) {
        SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
        if (fetcher == null) {
            fetcher = createSplitFetcher();
            // Add the splits to the fetchers.
            fetcher.addSplits(splitsToAdd);
            startFetcher(fetcher);
        } else {
            fetcher.addSplits(splitsToAdd);
        }
    }
    

    然后我们分析fetcher如何拉取数据的。上面的startFetcher方法启动SplitFetcher线程。

    protected void startFetcher(SplitFetcher<E, SplitT> fetcher) {
        executors.submit(fetcher);
    }
    

    SplitFetcher用于执行从外部系统拉取数据的任务,它一直循环运行SplitFetchTaskSplitFetchTask有多个子类:

    • AddSplitTask: 为reader指派split的任务
    • PauseOrResumeSplitsTask: 暂停或恢复Split读取的任务
    • FetchTask: 拉取数据到elemeQueue中

    接下来分析SplitFetcherrun方法:

    @Override
    public void run() {
        LOG.info("Starting split fetcher {}", id);
        try {
            // 循环执行runOnce方法
            while (runOnce()) {
                // nothing to do, everything is inside #runOnce.
            }
        } catch (Throwable t) {
            errorHandler.accept(t);
        } finally {
            try {
                splitReader.close();
            } catch (Exception e) {
                errorHandler.accept(e);
            } finally {
                LOG.info("Split fetcher {} exited.", id);
                // This executes after possible errorHandler.accept(t). If these operations bear
                // a happens-before relation, then we can checking side effect of
                // errorHandler.accept(t)
                // to know whether it happened after observing side effect of shutdownHook.run().
                shutdownHook.run();
            }
        }
    }
    
    boolean runOnce() {
        // first blocking call = get next task. blocks only if there are no active splits and queued
        // tasks.
        SplitFetcherTask task;
        lock.lock();
        try {
            if (closed) {
                return false;
            }
    
            // 重要逻辑在此
            // 这里从taskQueue中获取一个任务
            // 如果队列中有积压的任务,优先运行之
            // 如果taskQueue为空,检查是否有已分配的split,如果有的话返回一个FetchTask
            // FetchTask在SplitFetcher构造e时候被创建出来
            task = getNextTaskUnsafe();
            if (task == null) {
                // (spurious) wakeup, so just repeat
                return true;
            }
    
            LOG.debug("Prepare to run {}", task);
            // store task for #wakeUp
            this.runningTask = task;
        } finally {
            lock.unlock();
        }
    
        // execute the task outside of lock, so that it can be woken up
        boolean taskFinished;
        try {
            // 执行task的run方法
            taskFinished = task.run();
        } catch (Exception e) {
            throw new RuntimeException(
                    String.format(
                            "SplitFetcher thread %d received unexpected exception while polling the records",
                            id),
                    e);
        }
    
        // re-acquire lock as all post-processing steps, need it
        lock.lock();
        try {
            this.runningTask = null;
            processTaskResultUnsafe(task, taskFinished);
        } finally {
            lock.unlock();
        }
        return true;
    }
    

    用来拉取数据的SplitFetchTask子类为FetchTask。它的run方法代码如下所示:

    @Override
    public boolean run() throws IOException {
        try {
            // 在wakeup状态会跳过这一轮执行
            if (!isWakenUp() && lastRecords == null) {
                // 调用splitReader从split拉取一批数据
                lastRecords = splitReader.fetch();
            }
    
            if (!isWakenUp()) {
                // The order matters here. We must first put the last records into the queue.
                // This ensures the handling of the fetched records is atomic to wakeup.
                // 将读取到的数据放入到elementQueue中
                if (elementsQueue.put(fetcherIndex, lastRecords)) {
                    //如果有已经读取完的split
                    if (!lastRecords.finishedSplits().isEmpty()) {
                        // The callback does not throw InterruptedException.
                        // 调用读取完成回调函数
                        splitFinishedCallback.accept(lastRecords.finishedSplits());
                    }
                    lastRecords = null;
                }
            }
        } catch (InterruptedException e) {
            // this should only happen on shutdown
            throw new IOException("Source fetch execution was interrupted", e);
        } finally {
            // clean up the potential wakeup effect. It is possible that the fetcher is waken up
            // after the clean up. In that case, either the wakeup flag will be set or the
            // running thread will be interrupted. The next invocation of run() will see that and
            // just skip.
            if (isWakenUp()) {
                wakeup = false;
            }
        }
        // The return value of fetch task does not matter.
        return true;
    }
    

    上面代码片段中splitReader.fetch()对应的是KafkaPartitionSplitReaderfetch方法。

    @Override
    public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
        ConsumerRecords<byte[], byte[]> consumerRecords;
        try {
            // 调用KafkaConsumer拉取一批消息,超时时间为10s
            consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
        } catch (WakeupException | IllegalStateException e) {
            // IllegalStateException will be thrown if the consumer is not assigned any partitions.
            // This happens if all assigned partitions are invalid or empty (starting offset >=
            // stopping offset). We just mark empty partitions as finished and return an empty
            // record container, and this consumer will be closed by SplitFetcherManager.
            // 如注释所说,如果consumer没有指定消费的partition,会抛出IllegalStateException
            // 所有已分配的partition无效或者是为空(起始offset >= 停止offset)的时候也会出现这种情况
            // 返回空的KafkaPartitionSplitRecords,并且标记分区已完成
            // 这个consumer稍后会被SplitFetcherManager关闭
            KafkaPartitionSplitRecords recordsBySplits =
                    new KafkaPartitionSplitRecords(
                            ConsumerRecords.empty(), kafkaSourceReaderMetrics);
            markEmptySplitsAsFinished(recordsBySplits);
            return recordsBySplits;
        }
        // 将consumerRecords包装在KafkaPartitionSplitRecords中返回
        // KafkaPartitionSplitRecords具有pattition和record两个iterator
        KafkaPartitionSplitRecords recordsBySplits =
                new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics);
        List<TopicPartition> finishedPartitions = new ArrayList<>();
        // 遍历consumerRecords中的partition
        for (TopicPartition tp : consumerRecords.partitions()) {
            // 获取分区停止offset
            long stoppingOffset = getStoppingOffset(tp);
            // 读取这个partition的所有数据
            final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
                    consumerRecords.records(tp);
    
            // 如果读取到数据
            if (recordsFromPartition.size() > 0) {
                // 获取该分区最后一条读取到的数据
                final ConsumerRecord<byte[], byte[]> lastRecord =
                        recordsFromPartition.get(recordsFromPartition.size() - 1);
    
                // After processing a record with offset of "stoppingOffset - 1", the split reader
                // should not continue fetching because the record with stoppingOffset may not
                // exist. Keep polling will just block forever.
                // 如果最后一条消息的offset大于等于stoppingOffset
                // stopping使用consumer的endOffsets方法获取,
                // 设置recordsBySplits的结束offset
                // 然后标记这个split为已完成
                if (lastRecord.offset() >= stoppingOffset - 1) {
                    recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
                    finishSplitAtRecord(
                            tp,
                            stoppingOffset,
                            lastRecord.offset(),
                            finishedPartitions,
                            recordsBySplits);
                }
            }
            // Track this partition's record lag if it never appears before
            // 添加kafka记录延迟监控
            kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
        }
    
        // 将空的split标记为已完成的split
        markEmptySplitsAsFinished(recordsBySplits);
    
        // Unassign the partitions that has finished.
        // 不再记录已完成分区记录的延迟
        // 取消分配这些分区
        if (!finishedPartitions.isEmpty()) {
            finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
            unassignPartitions(finishedPartitions);
        }
    
        // Update numBytesIn
        // 更新已读取的字节数监控数值
        kafkaSourceReaderMetrics.updateNumBytesInCounter();
    
        return recordsBySplits;
    }
    

    到这里为止,我们分析完了从KafkaConsumer读取消息到并放置到ElementQueue的逻辑。接下来是Flink内部将ElementQueue中的数据读取出来并发送到下游的过程。

    SourceReaderBase将数据从elementQueue中读出然后交给recordEmitter

    SourceReaderBasegetNextFetch方法内容如下:

    @Nullable
    private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {
        splitFetcherManager.checkErrors();
    
        LOG.trace("Getting next source data batch from queue");
        // 从elementQueue中拿出一批数据
        final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
        // 如果当前split没有读取到数据,并且没有下一个split,返回null
        if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
            // No element available, set to available later if needed.
            return null;
        }
    
        currentFetch = recordsWithSplitId;
        return recordsWithSplitId;
    }
    

    getNextFetch这个方法在pollNext中调用。SourceOperator调用reader的pollNext方法,不断拉取数据发送交给recordEmitter发送给下游。

    @Override
    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
        // make sure we have a fetch we are working on, or move to the next
        RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
        if (recordsWithSplitId == null) {
            recordsWithSplitId = getNextFetch(output);
            if (recordsWithSplitId == null) {
                return trace(finishedOrAvailableLater());
            }
        }
    
        // we need to loop here, because we may have to go across splits
        while (true) {
            // Process one record.
            final E record = recordsWithSplitId.nextRecordFromSplit();
            if (record != null) {
                // emit the record.
                numRecordsInCounter.inc(1);
                recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
                LOG.trace("Emitted record: {}", record);
    
                // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
                // more is available. If nothing more is available, the next invocation will find
                // this out and return the correct status.
                // That means we emit the occasional 'false positive' for availability, but this
                // saves us doing checks for every record. Ultimately, this is cheaper.
                return trace(InputStatus.MORE_AVAILABLE);
            } else if (!moveToNextSplit(recordsWithSplitId, output)) {
                // The fetch is done and we just discovered that and have not emitted anything, yet.
                // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
                // rather than emitting nothing and waiting for the caller to call us again.
                return pollNext(output);
            }
        }
    }
    

    最后我们一路分析到KafkaRecordEmitteremitRecord方法。它把接收到的kafka消息逐条反序列化之后,发送给下游output。接着传递给下游算子。

    @Override
    public void emitRecord(
            ConsumerRecord<byte[], byte[]> consumerRecord,
            SourceOutput<T> output,
            KafkaPartitionSplitState splitState)
            throws Exception {
        try {
            sourceOutputWrapper.setSourceOutput(output);
            sourceOutputWrapper.setTimestamp(consumerRecord.timestamp());
            deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);
            splitState.setCurrentOffset(consumerRecord.offset() + 1);
        } catch (Exception e) {
            throw new IOException("Failed to deserialize consumer record due to", e);
        }
    }
    

    分区发现

    Flink KafkaSource支持按照配置的规则(topic列表,topic正则表达式或者直接指定分区),以定时任务的形式周期扫描Kafka分区,从而实现Kafka分区动态发现。

    KafkaSourceEnumeratorstart方法创建出KafkaAdminClient。然后根据partitionDiscoveryIntervalMs(分区自动发现间隔时间),确定是否周期调用分区发现逻辑。

    @Override
    public void start() {
        // 创建Kafka admin client
        adminClient = getKafkaAdminClient();
        // 如果配置了分区自动发现时间间隔
        if (partitionDiscoveryIntervalMs > 0) {
            LOG.info(
                    "Starting the KafkaSourceEnumerator for consumer group {} "
                            + "with partition discovery interval of {} ms.",
                    consumerGroupId,
                    partitionDiscoveryIntervalMs);
            // 周期调用getSubscribedTopicPartitions和checkPartitionChanges两个方法
            context.callAsync(
                    this::getSubscribedTopicPartitions,
                    this::checkPartitionChanges,
                    0,
                    partitionDiscoveryIntervalMs);
        } else {
            // 否则只在启动的时候调用一次
            LOG.info(
                    "Starting the KafkaSourceEnumerator for consumer group {} "
                            + "without periodic partition discovery.",
                    consumerGroupId);
            context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }
    

    getSubscribedTopicPartitions方法:

    private Set<TopicPartition> getSubscribedTopicPartitions() {
        return subscriber.getSubscribedTopicPartitions(adminClient);
    }
    

    这个方法调用KafkaSubscriber,根据配置的条件,获取订阅的partition。

    KafkaSubscriber具有3个子类,分别对应不同的分区发现规则:

    • PartitionSetSubscriber: 通过KafkaSourceBuildersetPartitions方法创建,直接根据partition名称订阅内容。
    • TopicListSubscriber: 根据topic列表获取订阅的partition。使用KafkaSourceBuildersetTopics可以订阅一系列的topic,使用的subscriber就是这个。
    • TopicPatternSubscriber: 使用正则表达式匹配topic名称的方式获取订阅的partition。使用KafkaSourceBuildersetTopicPattern方法的时候会创建此subscriber。

    接下来以TopicListSubscriber为例,分析获取订阅partiton的逻辑。

    @Override
    public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
        LOG.debug("Fetching descriptions for topics: {}", topics);
        // 使用admin client读取Kafka topic的元数据
        // 包含指定topic对应的分区信息
        final Map<String, TopicDescription> topicMetadata =
                getTopicMetadata(adminClient, new HashSet<>(topics));
    
        // 将各个分区的partition信息加入到subscribedPartitions集合,然后返回
        Set<TopicPartition> subscribedPartitions = new HashSet<>();
        for (TopicDescription topic : topicMetadata.values()) {
            for (TopicPartitionInfo partition : topic.partitions()) {
                subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition()));
            }
        }
    
        return subscribedPartitions;
    }
    

    获取订阅分区的逻辑不是特别复杂,其他两个subscriber的逻辑这里不再分析。

    getSubscribedTopicPartitions方法的返回值和异常(如果抛出异常的话)将会传递给checkPartitionChange方法。它将检测分区信息是否发生变更。代码逻辑如下:

    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable t) {
        if (t != null) {
            throw new FlinkRuntimeException(
                    "Failed to list subscribed topic partitions due to ", t);
        }
        // 检测分区变更情况
        final PartitionChange partitionChange = getPartitionChange(fetchedPartitions);
        // 如果没有变更,直接返回
        if (partitionChange.isEmpty()) {
            return;
        }
        // 如果检测到变更,调用initializePartitionSplits和handlePartitionSplitChanges方法
        context.callAsync(
                () -> initializePartitionSplits(partitionChange),
                this::handlePartitionSplitChanges);
    }
    
    @VisibleForTesting
    PartitionChange getPartitionChange(Set<TopicPartition> fetchedPartitions) {
        // 保存被移除的分区
        final Set<TopicPartition> removedPartitions = new HashSet<>();
        Consumer<TopicPartition> dedupOrMarkAsRemoved =
                (tp) -> {
                    if (!fetchedPartitions.remove(tp)) {
                        removedPartitions.add(tp);
                    }
                };
        // 如果分区在assignedPartitions(已分配分区)存在,在fetchedPartitions中不存在,说明分区已经移出
        // 将其加入到removedPartitions中
        assignedPartitions.forEach(dedupOrMarkAsRemoved);
        // pendingPartitionSplitAssignment为上轮发现但是还没有分配给reader读取的分区
        // 从pendingPartitionSplitAssignment中找到被移除的分区
        pendingPartitionSplitAssignment.forEach(
                (reader, splits) ->
                        splits.forEach(
                                split -> dedupOrMarkAsRemoved.accept(split.getTopicPartition())));
    
        // 如果fetchedPartitions还有分区没有remove掉,说明有新发现的分区
        if (!fetchedPartitions.isEmpty()) {
            LOG.info("Discovered new partitions: {}", fetchedPartitions);
        }
        if (!removedPartitions.isEmpty()) {
            LOG.info("Discovered removed partitions: {}", removedPartitions);
        }
    
        // 包装新增加的分区和移除的分区到PartitionChange中返回
        return new PartitionChange(fetchedPartitions, removedPartitions);
    }
    

    对比完新发现的分区和原本订阅的分区之后,接下来需要对这些变更做出响应。

    initializePartitionSplits方法将分区变更信息包装为PartitionSplitChange。这个对象记录了新增加的分区和移除的分区。和PartitionChange不同的是,PartitionSplitChange包含的新增分区的类型为KafkaPartitionSplit,它额外保存了分区的起始和终止offset。

    private PartitionSplitChange initializePartitionSplits(PartitionChange partitionChange) {
        // 获取新增的分区
        Set<TopicPartition> newPartitions =
                Collections.unmodifiableSet(partitionChange.getNewPartitions());
        // 获取分区offset获取器
        OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever = getOffsetsRetriever();
    
        // 获取起始offset
        Map<TopicPartition, Long> startingOffsets =
                startingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);
        // 获取终止offset
        Map<TopicPartition, Long> stoppingOffsets =
                stoppingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);
    
        Set<KafkaPartitionSplit> partitionSplits = new HashSet<>(newPartitions.size());
        // 将每个分区对应的starting offset和stopping offset包装起来
        for (TopicPartition tp : newPartitions) {
            Long startingOffset = startingOffsets.get(tp);
            long stoppingOffset =
                    stoppingOffsets.getOrDefault(tp, KafkaPartitionSplit.NO_STOPPING_OFFSET);
            partitionSplits.add(new KafkaPartitionSplit(tp, startingOffset, stoppingOffset));
        }
        // 返回结果
        return new PartitionSplitChange(partitionSplits, partitionChange.getRemovedPartitions());
    }
    

    上面的方法的关键逻辑是获取各个分区的起始offset(startingOffsetInitializer)和终止offset(KafkaSourceBuilder)。

    startingOffsetInitializerKafkaSourceBuilder中创建,默认为OffsetsInitializer.earliest()。代码如下:

    static OffsetsInitializer earliest() {
        return new ReaderHandledOffsetsInitializer(
                KafkaPartitionSplit.EARLIEST_OFFSET, OffsetResetStrategy.EARLIEST);
    }
    

    它创建出ReaderHandledOffsetsInitializer对象,含义是对于所有新发现的topic,从它们最开头的地方开始读取。

    ReaderHandledOffsetsInitializergetPartitionOffsets方法代码内容如下。它将所有的分区offset设置为startingOffset,结合前面的场景,即KafkaPartitionSplit.EARLIEST_OFFSET

    @Override
    public Map<TopicPartition, Long> getPartitionOffsets(
            Collection<TopicPartition> partitions,
            PartitionOffsetsRetriever partitionOffsetsRetriever) {
        Map<TopicPartition, Long> initialOffsets = new HashMap<>();
        for (TopicPartition tp : partitions) {
            initialOffsets.put(tp, startingOffset);
        }
        return initialOffsets;
    }
    

    对于stoppingOffsetInitializerKafkaSourceBuilder创建的默认为NoStoppingOffsetsInitializer。含义为没有终止offset,针对unbounded(无界)kafka数据流。它的代码很少,这里就不再分析了。

    我们回到应对分区变更的方法handlePartitionSplitChanges。这个方法将新发现的分区分配给pending和已注册的reader。

    private void handlePartitionSplitChanges(
            PartitionSplitChange partitionSplitChange, Throwable t) {
        if (t != null) {
            throw new FlinkRuntimeException("Failed to initialize partition splits due to ", t);
        }
        if (partitionDiscoveryIntervalMs < 0) {
            LOG.debug("Partition discovery is disabled.");
            noMoreNewPartitionSplits = true;
        }
        // TODO: Handle removed partitions.
        addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
        assignPendingPartitionSplits(context.registeredReaders().keySet());
    }
    

    addPartitionSplitChangeToPendingAssignments将分区加入到待读取(pending)集合中。

    private void addPartitionSplitChangeToPendingAssignments(
            Collection<KafkaPartitionSplit> newPartitionSplits) {
        int numReaders = context.currentParallelism();
        for (KafkaPartitionSplit split : newPartitionSplits) {
            // 将这些分区均匀分配给所有的reader
            int ownerReader = getSplitOwner(split.getTopicPartition(), numReaders);
            pendingPartitionSplitAssignment
                    .computeIfAbsent(ownerReader, r -> new HashSet<>())
                    .add(split);
        }
        LOG.debug(
                "Assigned {} to {} readers of consumer group {}.",
                newPartitionSplits,
                numReaders,
                consumerGroupId);
    }
    

    assignPendingPartitionSplits方法分配分区给reader。它的逻辑分析如下:

    private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
        Map<Integer, List<KafkaPartitionSplit>> incrementalAssignment = new HashMap<>();
    
        // Check if there's any pending splits for given readers
        for (int pendingReader : pendingReaders) {
            // 检查reader是否已在SourceCoordinator中注册
            checkReaderRegistered(pendingReader);
    
            // Remove pending assignment for the reader
            // 获取这个reader对应的所有分配给它的分区,然后从pendingPartitionSplitAssignment中移除
            final Set<KafkaPartitionSplit> pendingAssignmentForReader =
                    pendingPartitionSplitAssignment.remove(pendingReader);
    
            // 如果有分配给这个reader的分区,将他们加入到incrementalAssignment中
            if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
                // Put pending assignment into incremental assignment
                incrementalAssignment
                        .computeIfAbsent(pendingReader, (ignored) -> new ArrayList<>())
                        .addAll(pendingAssignmentForReader);
    
                // Mark pending partitions as already assigned
                // 标记这些分区为已分配
                pendingAssignmentForReader.forEach(
                        split -> assignedPartitions.add(split.getTopicPartition()));
            }
        }
    
        // Assign pending splits to readers
        // 将这些分区分配给reader
        if (!incrementalAssignment.isEmpty()) {
            LOG.info("Assigning splits to readers {}", incrementalAssignment);
            context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
        }
    
        // If periodically partition discovery is disabled and the initializing discovery has done,
        // signal NoMoreSplitsEvent to pending readers
        // 如果没有新的分片(分区发现被关闭),并且设置为有界模式
        // 给reader发送没有更多分片信号(signalNoMoreSplits)
        if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
            LOG.debug(
                    "No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}"
                            + " in consumer group {}.",
                    pendingReaders,
                    consumerGroupId);
            pendingReaders.forEach(context::signalNoMoreSplits);
        }
    }
    

    调用assignPendingPartitionSplits方法的地方有三处:

    • addSplitsBack: 某个reader执行失败,在上次成功checkpoint之后分配给这个reader的split需要再添加回SplitEnumerator中。
    • addReader: 增加新的reader。需要给新的reader分配split。
    • handlePartitionSplitChanges: 上面介绍的检测到分区变更的时候,需要为reader分配新发现的分区。

    接着我们关心的问题是这些分片是如何添加给SplitEnumerator的。我们展开分析context.assignSplits调用。这里的context实现类为SourceCoordinatorContext。继续分析SourceCoordinatorContext::assignSplits方法代码:

    @Override
    public void assignSplits(SplitsAssignment<SplitT> assignment) {
        // Ensure the split assignment is done by the coordinator executor.
        // 在SourceCoordinator线程中调用
        callInCoordinatorThread(
                () -> {
                    // Ensure all the subtasks in the assignment have registered.
                    // 逐个检查需要分配的split所属的reader是否已注册过
                    assignment
                            .assignment()
                            .forEach(
                                    (id, splits) -> {
                                        if (!registeredReaders.containsKey(id)) {
                                            throw new IllegalArgumentException(
                                                    String.format(
                                                            "Cannot assign splits %s to subtask %d because the subtask is not registered.",
                                                            splits, id));
                                        }
                                    });
    
                    // 记录已分配的assignment(加入到尚未checkpoint的assignment集合中)
                    assignmentTracker.recordSplitAssignment(assignment);
                    // 分配split
                    assignSplitsToAttempts(assignment);
                    return null;
                },
                String.format("Failed to assign splits %s due to ", assignment));
    }
    

    assignSplitsToAttempts有好几个重载方法。一路跟随到最后,它创建出了AddSplitEvent对象,通过OperatorCoordinator发送这个事件给SourceOperator。代码如下所示:

    private void assignSplitsToAttempts(SplitsAssignment<SplitT> assignment) {
        assignment.assignment().forEach((index, splits) -> assignSplitsToAttempts(index, splits));
    }
    
    private void assignSplitsToAttempts(int subtaskIndex, List<SplitT> splits) {
        getRegisteredAttempts(subtaskIndex)
                .forEach(attempt -> assignSplitsToAttempt(subtaskIndex, attempt, splits));
    }
    
    private void assignSplitsToAttempt(int subtaskIndex, int attemptNumber, List<SplitT> splits) {
        if (splits.isEmpty()) {
            return;
        }
    
        checkAttemptReaderReady(subtaskIndex, attemptNumber);
    
        final AddSplitEvent<SplitT> addSplitEvent;
        try {
            // 创建AddSplitEvent(添加split事件)
            addSplitEvent = new AddSplitEvent<>(splits, splitSerializer);
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to serialize splits.", e);
        }
    
        final OperatorCoordinator.SubtaskGateway gateway =
                subtaskGateways.getGatewayAndCheckReady(subtaskIndex, attemptNumber);
        // 将事件发送给subtaskIndex对应的SourceOperator
        gateway.sendEvent(addSplitEvent);
    }
    

    gateway.sendEvent() -> SourceOperator::handleOperatorEvent

    网络通信之间的过程这里不再分析了。我们查看SourceOperator接收event的方法handleOperatorEvent,内容如下:

    public void handleOperatorEvent(OperatorEvent event) {
        if (event instanceof WatermarkAlignmentEvent) {
            updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);
            checkWatermarkAlignment();
            checkSplitWatermarkAlignment();
        } else if (event instanceof AddSplitEvent) {
            handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));
        } else if (event instanceof SourceEventWrapper) {
            sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
        } else if (event instanceof NoMoreSplitsEvent) {
            sourceReader.notifyNoMoreSplits();
        } else {
            throw new IllegalStateException("Received unexpected operator event " + event);
        }
    }
    

    如果接收到的事件类型为AddSplitEvent,调用handleAddSplitsEvent方法。分析如下:

    private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
        try {
            // 反序列化得到split信息
            List<SplitT> newSplits = event.splits(splitSerializer);
            numSplits += newSplits.size();
            // 如果下游output还没有初始化,加入到pending集合中缓存起来
            // 否则创建output,将split分配给这些output
            if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
                // For splits arrived before the main output is initialized, store them into the
                // pending list. Outputs of these splits will be created once the main output is
                // ready.
                outputPendingSplits.addAll(newSplits);
            } else {
                // Create output directly for new splits if the main output is already initialized.
                createOutputForSplits(newSplits);
            }
            // 将split添加到sourceReader
            sourceReader.addSplits(newSplits);
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
        }
    }
    

    最后我们一路跟踪到SourceReaderBaseaddSplits方法。

    @Override
    public void addSplits(List<SplitT> splits) {
        LOG.info("Adding split(s) to reader: {}", splits);
        // Initialize the state for each split.
        splits.forEach(
                s ->
                        splitStates.put(
                                s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
        // Hand over the splits to the split fetcher to start fetch.
        splitFetcherManager.addSplits(splits);
    }
    

    它把split交给splitFetcherManager执行。在本篇KafkaSource环境下它的实现类为KafkaSourceFetcherManager。它的addSplits方法位于父类SingleThreadFetcherManager中。

    分析到这里,我们回到了上一节"数据读取流程"的开头"添加分片"方法。至此KafkaSource分区发现逻辑分析完毕。

    Checkpoint逻辑

    KafkaSourceReadersnapshotState方法返回当前需要checkpoint的分片信息,即Reader分配的分片。如果用户配置了commit.offsets.on.checkpoint=true,保存各个分片对应的分区和offset分区到offsetsToCommit中。

    @Override
    public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
        // 获取分配给当前Reader的分片(checkpointId参数实际上没有用到)
        List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
        // 由配置项commit.offsets.on.checkpoint决定
        // 是否在checkpoint的时候,提交offset
        if (!commitOffsetsOnCheckpoint) {
            return splits;
        }
    
        // 下面逻辑只有在开启commit.offsets.on.checkpoint的时候才会执行
        // offsetToCommit保存了需要commit的offset信息
        // 是一个Map<checkpointID, Map<partition, offset>>数据结构
        // 如果当前Reader没有分片,并且也没有读取完毕的分片
        // offsetsToCommit记录checkpoint id对应一个空的map
        if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
            offsetsToCommit.put(checkpointId, Collections.emptyMap());
        } else {
            // 为当前checkpoint id创建一个offsetMap,保存在offsetsToCommit中
            Map<TopicPartition, OffsetAndMetadata> offsetsMap =
                    offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
            // Put the offsets of the active splits.
            // 遍历splits,保存split对应的分区和offset到offsetsMap中
            for (KafkaPartitionSplit split : splits) {
                // If the checkpoint is triggered before the partition starting offsets
                // is retrieved, do not commit the offsets for those partitions.
                if (split.getStartingOffset() >= 0) {
                    offsetsMap.put(
                            split.getTopicPartition(),
                            new OffsetAndMetadata(split.getStartingOffset()));
                }
            }
            // 保存所有完成读取的split的partition和offset信息
            // Put offsets of all the finished splits.
            offsetsMap.putAll(offsetsOfFinishedSplits);
        }
        return splits;
    }
    

    notifyCheckpointComplete方法。该方法在checkpoint完毕的时候执行。由SourceCoordinator发送checkpoint完毕通知。在这个方法中Kafka数据源提交Kafka offset。

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing offsets for checkpoint {}", checkpointId);
        // 同上,如果没有启用checkpoint时候提交offset的配置,方法退出,什么也不做
        if (!commitOffsetsOnCheckpoint) {
            return;
        }
    
        // 从offsetsToCommit中获取当前checkpoint需要提交的分区offset信息
        Map<TopicPartition, OffsetAndMetadata> committedPartitions =
                offsetsToCommit.get(checkpointId);
        // 如果为空,退出
        if (committedPartitions == null) {
            LOG.debug(
                    "Offsets for checkpoint {} either do not exist or have already been committed.",
                    checkpointId);
            return;
        }
    
        // 调用KafkaSourceFetcherManager,提交offset到kafka
        // 稍后分析
        ((KafkaSourceFetcherManager) splitFetcherManager)
                .commitOffsets(
                        committedPartitions,
                        (ignored, e) -> {
                            // The offset commit here is needed by the external monitoring. It won't
                            // break Flink job's correctness if we fail to commit the offset here.
                            // 这里是提交offset的回调函数
                            // 如果遇到错误,监控指标记录下失败的提交
                            if (e != null) {
                                kafkaSourceReaderMetrics.recordFailedCommit();
                                LOG.warn(
                                        "Failed to commit consumer offsets for checkpoint {}",
                                        checkpointId,
                                        e);
                            } else {
                                LOG.debug(
                                        "Successfully committed offsets for checkpoint {}",
                                        checkpointId);
                                // 监控指标记录成功的提交
                                kafkaSourceReaderMetrics.recordSucceededCommit();
                                // If the finished topic partition has been committed, we remove it
                                // from the offsets of the finished splits map.
                                committedPartitions.forEach(
                                        (tp, offset) ->
                                                kafkaSourceReaderMetrics.recordCommittedOffset(
                                                        tp, offset.offset()));
                                // 由于offset已提交,从已完成split集合中移除
                                offsetsOfFinishedSplits
                                        .entrySet()
                                        .removeIf(
                                                entry ->
                                                        committedPartitions.containsKey(
                                                                entry.getKey()));
                                // 移除当前以及之前的checkpoint id对应的offset信息,因为已经commit过,无需再保存
                                while (!offsetsToCommit.isEmpty()
                                        && offsetsToCommit.firstKey() <= checkpointId) {
                                    offsetsToCommit.remove(offsetsToCommit.firstKey());
                                }
                            }
                        });
    }
    

    接下来我们关注KafkaSourceFetcherManager。这个类负责向KafkaConsumer提交offset,逻辑对应commitOffsets方法,内容如下:

    public void commitOffsets(
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback callback) {
        LOG.debug("Committing offsets {}", offsetsToCommit);
        // 如果没有offset需要commit,返回
        if (offsetsToCommit.isEmpty()) {
            return;
        }
        // 获取正在运行的SplitFetcher
        SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher =
                fetchers.get(0);
        if (splitFetcher != null) {
            // The fetcher thread is still running. This should be the majority of the cases.
            // 如果fetcher仍在运行,创建提交offset的任务,加入队列
            enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
        } else {
            // 如果没有SplitFetcher运行,创建一个新的SplitFetcher
            // 和上面异常创建任务之后,启动这个SplitFetcher
            splitFetcher = createSplitFetcher();
            enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
            startFetcher(splitFetcher);
        }
    }
    

    继续分析创建offset提交任务的方法。代码如下:

    private void enqueueOffsetsCommitTask(
            SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher,
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
            OffsetCommitCallback callback) {
        // 获取splitFetcher对应的KafkaReader
        KafkaPartitionSplitReader kafkaReader =
                (KafkaPartitionSplitReader) splitFetcher.getSplitReader();
    
        为fetcher创建一个SplitFetcherTask
        splitFetcher.enqueueTask(
                new SplitFetcherTask() {
                    @Override
                    public boolean run() throws IOException {
                        kafkaReader.notifyCheckpointComplete(offsetsToCommit, callback);
                        return true;
                    }
    
                    @Override
                    public void wakeUp() {}
                });
    }
    

    到此,一个SplitFetcherTask已被添加到SplitFetchertaskQueue中。根据我们在前面"数据读取流程"分析的结论,SplitFetcher通过runOnce方法逐个读取taskQueue中排队的任务执行。当它取出SplitFetcherTask时,会运行它的run方法。调用kafkaReader.notifyCheckpointComplete方法。这个方法调用KafkaConsumer的异步提交offset方法commitAsync

    public void notifyCheckpointComplete(
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
            OffsetCommitCallback offsetCommitCallback) {
        consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    }
    

    到这里,KafkaSource checkpoint提交offset的过程分析完毕。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之 KafkaSource

        本文链接:https://www.haomeiwen.com/subject/dihvortx.html