美文网首页玩转大数据Flink学习指南
Flink 源码之新 Source 架构

Flink 源码之新 Source 架构

作者: AlienPaul | 来源:发表于2022-02-17 13:51 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    背景

    Flink原先数据源一直使用的是SourceFunction。实现它的run方法,使用SourceContextcollect数据或者发送watermark就实现了一个数据源。但是它有如下问题(来源于FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation翻译):

    • 同一类型数据源的批和流模式需要两套不同实现。

    • “work发现”(分片、分区等)和实际“读取”数据的逻辑混杂在 SourceFunction 接口和 DataStream API 中,导致实现非常复杂,如 Kafka 和 Kinesis 源等。

    • 分区/分片/拆分在接口中不是明确的。这使得以与source无关的方式实现某些功能变得困难,例如event time对齐、每个分区水印、动态拆分分配、工作窃取。例如,Kafka 和 Kinesis consumer支持每个分区的watermark,但从 Flink 1.8.1 开始,只有 Kinesis 消费者支持event time对齐(选择性地从拆分中读取以确保我们在事件时间上均匀推进)。

    • Checkpoint锁由source function“拥有”。实现必须确保进行元素发送和state更新时加锁。 Flink 无法优化它处理该锁的方式。
      锁不是公平锁。在锁竞争下,一些线程可能无法获得锁(checkpoint线程)。
      这也妨碍使用actor/mailbox无锁线程模型。

    • 没有通用的构建块,这意味着每个源都自己实现了一个复杂的线程模型。这使得实施和测试新source变得困难,并增加了对现有source的开发贡献的标准。

    为了解决这些问题,Flink引入了新的Source架构。

    新Source架构

    新Source架构包含两个主要的组件:

    • SplitEnumerator:发现和指派split(split可以为文件,分区等)。
    • Reader:负责从split中读取真实数据。

    下面我们开展分析新架构相关的几个接口。

    Source

    通常用作工厂类。包含创建SourceReaderSplitEnumerator和对应Serializers的工厂方法。该接口的方法解释如下:

    @Public
    public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {
    
        /**
         * Get the boundedness of this source.
         *
         * @return the boundedness of this source.
         */
        // 数据源是否有界
        // 返回值为Boundedness.BOUNDED 或者 Boundedness.CONTINUOUS_UNBOUNDED
        Boundedness getBoundedness();
    
        /**
         * Creates a new reader to read data from the splits it gets assigned. The reader starts fresh
         * and does not have any state to resume.
         *
         * @param readerContext The {@link SourceReaderContext context} for the source reader.
         * @return A new SourceReader.
         * @throws Exception The implementor is free to forward all exceptions directly. Exceptions
         *     thrown from this method cause task failure/recovery.
         */
        // 创建一个reader,读取它分配的split。这个Reader是全新的,不需要从状态恢复
        SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) throws Exception;
    
        /**
         * Creates a new SplitEnumerator for this source, starting a new input.
         *
         * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
         * @return A new SplitEnumerator.
         * @throws Exception The implementor is free to forward all exceptions directly. * Exceptions
         *     thrown from this method cause JobManager failure/recovery.
         */
        // 创建SplitEnumerator,开启一个新的输入
        SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)
                throws Exception;
    
        /**
         * Restores an enumerator from a checkpoint.
         *
         * @param enumContext The {@link SplitEnumeratorContext context} for the restored split
         *     enumerator.
         * @param checkpoint The checkpoint to restore the SplitEnumerator from.
         * @return A SplitEnumerator restored from the given checkpoint.
         * @throws Exception The implementor is free to forward all exceptions directly. * Exceptions
         *     thrown from this method cause JobManager failure/recovery.
         */
        // 从Checkpoint恢复一个SplitEnumerator
        SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
                SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws Exception;
    
        // ------------------------------------------------------------------------
        //  serializers for the metadata
        // ------------------------------------------------------------------------
    
        /**
         * Creates a serializer for the source splits. Splits are serialized when sending them from
         * enumerator to reader, and when checkpointing the reader's current state.
         *
         * @return The serializer for the split type.
         */
        // 创建source split的serializer。当split从enumerator发送到reader和reader checkpoint的时候,split会被序列化
        SimpleVersionedSerializer<SplitT> getSplitSerializer();
    
        /**
         * Creates the serializer for the {@link SplitEnumerator} checkpoint. The serializer is used for
         * the result of the {@link SplitEnumerator#snapshotState()} method.
         *
         * @return The serializer for the SplitEnumerator checkpoint.
         */
        // 获取SplitEnumerator checkpoint的serializer,用于处理SplitEnumerator#snapshotState()方法返回的结果
        SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
    }
    

    SourceReader

    负责读取SplitEnumerator分配的source split。接口方法定义如下:

    @Public
    public interface SourceReader<T, SplitT extends SourceSplit>
            extends AutoCloseable, CheckpointListener {
    
        /** Start the reader. */
        // 启动reader
        void start();
    
        /**
         * Poll the next available record into the {@link SourceOutput}.
         *
         * <p>The implementation must make sure this method is non-blocking.
         *
         * <p>Although the implementation can emit multiple records into the given SourceOutput, it is
         * recommended not doing so. Instead, emit one record into the SourceOutput and return a {@link
         * InputStatus#MORE_AVAILABLE} to let the caller thread know there are more records available.
         *
         * @return The InputStatus of the SourceReader after the method invocation.
         */
        // 拉取下一个可读取的记录到SourceOutput
        // 务必确保这个方法是非阻塞的
        // 最好一次调用只输出一条数据
        InputStatus pollNext(ReaderOutput<T> output) throws Exception;
    
        /**
         * Checkpoint on the state of the source.
         *
         * @return the state of the source.
         */
        // 创建source的checkpoint
        List<SplitT> snapshotState(long checkpointId);
    
        /**
         * Returns a future that signals that data is available from the reader.
         *
         * <p>Once the future completes, the runtime will keep calling the {@link
         * #pollNext(ReaderOutput)} method until that methods returns a status other than {@link
         * InputStatus#MORE_AVAILABLE}. After that the, the runtime will again call this method to
         * obtain the next future. Once that completes, it will again call {@link
         * #pollNext(ReaderOutput)} and so on.
         *
         * <p>The contract is the following: If the reader has data available, then all futures
         * previously returned by this method must eventually complete. Otherwise the source might stall
         * indefinitely.
         *
         * <p>It is not a problem to have occasional "false positives", meaning to complete a future
         * even if no data is available. However, one should not use an "always complete" future in
         * cases no data is available, because that will result in busy waiting loops calling {@code
         * pollNext(...)} even though no data is available.
         *
         * @return a future that will be completed once there is a record available to poll.
         */
        // 创建一个future,表明reader中是否有数据可被读取
        // 一旦这个future进入completed状态,Flink一直调用pollNext(ReaderOutput)方法指导这个方法返回除InputStatus#MORE_AVAILABLE之外的内容
        // 在这之后,会再次调isAvailable方法获取下一个future。如果它completed,再次调用pollNext(ReaderOutput)。以此类推
        CompletableFuture<Void> isAvailable();
    
        /**
         * Adds a list of splits for this reader to read. This method is called when the enumerator
         * assigns a split via {@link SplitEnumeratorContext#assignSplit(SourceSplit, int)} or {@link
         * SplitEnumeratorContext#assignSplits(SplitsAssignment)}.
         *
         * @param splits The splits assigned by the split enumerator.
         */
        // 添加一系列splits,以供reader读取。这个方法在SplitEnumeratorContext#assignSplit(SourceSplit, int)或者SplitEnumeratorContext#assignSplits(SplitsAssignment)中调用
        void addSplits(List<SplitT> splits);
    
        /**
         * This method is called when the reader is notified that it will not receive any further
         * splits.
         *
         * <p>It is triggered when the enumerator calls {@link
         * SplitEnumeratorContext#signalNoMoreSplits(int)} with the reader's parallel subtask.
         */
        // 如果reader不会接收到更多的split,这个方法会被调用,从而通知reader
        void notifyNoMoreSplits();
    
        /**
         * Handle a custom source event sent by the {@link SplitEnumerator}. This method is called when
         * the enumerator sends an event via {@link SplitEnumeratorContext#sendEventToSourceReader(int,
         * SourceEvent)}.
         *
         * <p>This method has a default implementation that does nothing, because most sources do not
         * require any custom events.
         *
         * @param sourceEvent the event sent by the {@link SplitEnumerator}.
         */
        // 处理SplitEnumerator发出的SourceEvent。
        // SplitEnumeratorContext#sendEventToSourceReader(int, SourceEvent)发送event的时候调用
        default void handleSourceEvents(SourceEvent sourceEvent) {}
    
        /**
         * We have an empty default implementation here because most source readers do not have to
         * implement the method.
         *
         * @see CheckpointListener#notifyCheckpointComplete(long)
         */
        // checkpoint完成的时候通知
        // 绝大多数reader不需要实现这个方法
        @Override
        default void notifyCheckpointComplete(long checkpointId) throws Exception {}
    }
    

    SplitEnumerator

    它负责内容如下:

    1. 发现机制。实时发现可供SourceReader读取的split。
    2. SourceReader分配split。
    @Public
    public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT>
            extends AutoCloseable, CheckpointListener {
    
        /**
         * Start the split enumerator.
         *
         * <p>The default behavior does nothing.
         */
        // 启动方法
        void start();
    
        /**
         * Handles the request for a split. This method is called when the reader with the given subtask
         * id calls the {@link SourceReaderContext#sendSplitRequest()} method.
         *
         * @param subtaskId the subtask id of the source reader who sent the source event.
         * @param requesterHostname Optional, the hostname where the requesting task is running. This
         *     can be used to make split assignments locality-aware.
         */
        // 处理split请求。拥有指定subtask id的reader调用SourceReaderContext#sendSplitRequest()的时候调用
        void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
    
        /**
         * Add a split back to the split enumerator. It will only happen when a {@link SourceReader}
         * fails and there are splits assigned to it after the last successful checkpoint.
         *
         * @param splits The split to add back to the enumerator for reassignment.
         * @param subtaskId The id of the subtask to which the returned splits belong.
         */
        // 把split添加回split enumerator
        // 仅当SourceReader失败并且在上次成功checkpoint之后还有split分配给它的时候调用
        void addSplitsBack(List<SplitT> splits, int subtaskId);
    
        /**
         * Add a new source reader with the given subtask ID.
         *
         * @param subtaskId the subtask ID of the new source reader.
         */
        // 添加一个新的SourceReader,指定subtask ID
        void addReader(int subtaskId);
    
        /**
         * Creates a snapshot of the state of this split enumerator, to be stored in a checkpoint.
         *
         * <p>The snapshot should contain the latest state of the enumerator: It should assume that all
         * operations that happened before the snapshot have successfully completed. For example all
         * splits assigned to readers via {@link SplitEnumeratorContext#assignSplit(SourceSplit, int)}
         * and {@link SplitEnumeratorContext#assignSplits(SplitsAssignment)}) don't need to be included
         * in the snapshot anymore.
         *
         * <p>This method takes the ID of the checkpoint for which the state is snapshotted. Most
         * implementations should be able to ignore this parameter, because for the contents of the
         * snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be
         * interesting for source connectors with external systems where those systems are themselves
         * aware of checkpoints; for example in cases where the enumerator notifies that system about a
         * specific checkpoint being triggered.
         *
         * @param checkpointId The ID of the checkpoint for which the snapshot is created.
         * @return an object containing the state of the split enumerator.
         * @throws Exception when the snapshot cannot be taken.
         */
        // 创建split enumerator的checkpoint
        // 需要假设所有操作在snapshot成功完成前发生。比如assignSplit操作不需要再snapshot中考虑
        CheckpointT snapshotState(long checkpointId) throws Exception;
    
        /**
         * Called to close the enumerator, in case it holds on to any resources, like threads or network
         * connections.
         */
        // 关闭enumerator
        @Override
        void close() throws IOException;
    
        /**
         * We have an empty default implementation here because most source readers do not have to
         * implement the method.
         *
         * @see CheckpointListener#notifyCheckpointComplete(long)
         */
        // checkpoint完成时候通知
        @Override
        default void notifyCheckpointComplete(long checkpointId) throws Exception {}
    
        /**
         * Handles a custom source event from the source reader.
         *
         * <p>This method has a default implementation that does nothing, because it is only required to
         * be implemented by some sources, which have a custom event protocol between reader and
         * enumerator. The common events for reader registration and split requests are not dispatched
         * to this method, but rather invoke the {@link #addReader(int)} and {@link
         * #handleSplitRequest(int, String)} methods.
         *
         * @param subtaskId the subtask id of the source reader who sent the source event.
         * @param sourceEvent the source event from the source reader.
         */
        // 处理source reader的自定义SourceEvent
        default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
    }
    

    SourceCoordinator

    使用event loop线程模型和Flink runtime交互,确保所有的状态操作在event loop线程(单线程池)中。它还记录split分配的历史,从而简化SplitEnumerator的实现。

    启动方法start,创建出enumerator并且调用enumerator的start方法。代码如下:

    @Override
    public void start() throws Exception {
        LOG.info("Starting split enumerator for source {}.", operatorName);
    
        // we mark this as started first, so that we can later distinguish the cases where
        // 'start()' wasn't called and where 'start()' failed.
        // 标记已经开始运行
        started = true;
    
        // there are two ways the SplitEnumerator can get created:
        //  (1) Source.restoreEnumerator(), in which case the 'resetToCheckpoint()' method creates
        // it
        //  (2) Source.createEnumerator, in which case it has not been created, yet, and we create
        // it here
        if (enumerator == null) {
            final ClassLoader userCodeClassLoader =
                context.getCoordinatorContext().getUserCodeClassloader();
            try (TemporaryClassLoaderContext ignored =
                 TemporaryClassLoaderContext.of(userCodeClassLoader)) {
                // 调用Source的创建enumerator方法
                enumerator = source.createEnumerator(context);
            } catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
                LOG.error("Failed to create Source Enumerator for source {}", operatorName, t);
                context.failJob(t);
                return;
            }
        }
    
        // The start sequence is the first task in the coordinator executor.
        // We rely on the single-threaded coordinator executor to guarantee
        // the other methods are invoked after the enumerator has started.
        // 使用单线程Executor执行enumator的start方法,确保其他方法在enumerator启动之后运行
        runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator.");
    }
    

    handleEventFromOperator方法用来接收operator发来的事件。然后做出响应。

    @Override
    public void handleEventFromOperator(int subtask, OperatorEvent event) {
        runInEventLoop(
            () -> {
                if (event instanceof RequestSplitEvent) {
                    // 请求读取split事件
                    LOG.info(
                        "Source {} received split request from parallel task {}",
                        operatorName,
                        subtask);
                    enumerator.handleSplitRequest(
                        subtask, ((RequestSplitEvent) event).hostName());
                } else if (event instanceof SourceEventWrapper) {
                    // 如果是SourceEvent
                    final SourceEvent sourceEvent =
                        ((SourceEventWrapper) event).getSourceEvent();
                    LOG.debug(
                        "Source {} received custom event from parallel task {}: {}",
                        operatorName,
                        subtask,
                        sourceEvent);
                    enumerator.handleSourceEvent(subtask, sourceEvent);
                } else if (event instanceof ReaderRegistrationEvent) {
                    // 如果是reader注册事件
                    final ReaderRegistrationEvent registrationEvent =
                        (ReaderRegistrationEvent) event;
                    LOG.info(
                        "Source {} registering reader for parallel task {} @ {}",
                        operatorName,
                        subtask,
                        registrationEvent.location());
                    // 为enumerator添加一个reader
                    handleReaderRegistrationEvent(registrationEvent);
                } else {
                    throw new FlinkException("Unrecognized Operator Event: " + event);
                }
            },
            "handling operator event %s from subtask %d",
            event,
            subtask);
    }
    

    一个实例:FileSource

    新Source架构相关接口已经分析完毕。从这一章节开始分析一个使用新架构的实例:FileSource

    FileSource

    FileSource仅仅包含创建实例的工厂方法。Source接口方法的实现位于FileSource的父类AbstractFileSource中。

    我们查看FileSource实现Source接口的方法。

    getBoundedness方法逻辑如下。如果使用了monitorContinuously(位于AbstractFileSource)则为无界模式,否则为有界:

    @Override
    public Boundedness getBoundedness() {
        return continuousEnumerationSettings == null
            ? Boundedness.BOUNDED
            : Boundedness.CONTINUOUS_UNBOUNDED;
    }
    

    createReader方法创建出FileSourceReaderFileSourceReader后面分析。

    @Override
    public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) {
        return new FileSourceReader<>(
            readerContext, readerFormat, readerContext.getConfiguration());
    }
    

    createEnumerator方法创建出SplitEnumerator。如下所示:

    @Override
    public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(
        SplitEnumeratorContext<SplitT> enumContext) {
    
        // enumeratorFactory可以为FileSource中的DEFAULT_SPLITTABLE_FILE_ENUMERATOR或者是DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR
        // 分别对应可切分文件和不可切分的文件
        // FileEnumerator并没有实现SplitEnumerator接口
        // 在下面createSplitEnumerator方法中将它包装为所需类型
        final FileEnumerator enumerator = enumeratorFactory.create();
    
        // read the initial set of splits (which is also the total set of splits for bounded
        // sources)
        // 读取原始的splits,对于有界的source,这些原始的splits也是所有的splits(有界source数据不会再增长)
        final Collection<FileSourceSplit> splits;
        try {
            // TODO - in the next cleanup pass, we should try to remove the need to "wrap unchecked"
            // here
            splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not enumerate file splits", e);
        }
        // 创建SplitEnumerator
        // 根据数据源是否有界,将其包装为StaticFileSplitEnumerator或者ContinuousFileSplitEnumerator
        return createSplitEnumerator(enumContext, enumerator, splits, null);
    }
    

    FileSourceReader

    FileSourceReader实现了SourceReader接口。它从FileSourceSplit中读取数据。FileSourceSplit代表一个文件(不可分割)或者是一个文件的片段(可分割文件),它保存了文件的路径,所在host,长度偏移量等信息。

    FileSourceReader读取数据的pollNext方法位于父类SourceReaderBase中,代码如下:

    @Override
    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
        // make sure we have a fetch we are working on, or move to the next
        // 获取当前从fetcher中读取到的一批split
        // RecordsWithSplitIds代表了从fetcher拉取到SourceReader的数据
        // RecordsWithSplitIds可以包含多个split,但是对于FileRecords而言,只代表一个split
        RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
        if (recordsWithSplitId == null) {
            // 如果没有,获取下一批split
            recordsWithSplitId = getNextFetch(output);
            if (recordsWithSplitId == null) {
                // 如果还没有获取到,需要检查后续是否还会有数据到来。后面分析这个方法
                return trace(finishedOrAvailableLater());
            }
        }
    
        // we need to loop here, because we may have to go across splits
        while (true) {
            // Process one record.
            // 从split中获取下一条记录
            final E record = recordsWithSplitId.nextRecordFromSplit();
            if (record != null) {
                // emit the record.
                // 如果获取到数据
                // 记录数量计数器加1
                numRecordsInCounter.inc(1);
                // 发送数据到Output
                // currentSplitOutput为当前split对应的下游output
                // currentSplitContext.state为reader的读取状态
                recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
                LOG.trace("Emitted record: {}", record);
    
                // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
                // more is available. If nothing more is available, the next invocation will find
                // this out and return the correct status.
                // That means we emit the occasional 'false positive' for availability, but this
                // saves us doing checks for every record. Ultimately, this is cheaper.
                // 总是发送MORE_AVAILABLE
                // 如果真的没有可用数据,下次调用会返回正确的状态
                return trace(InputStatus.MORE_AVAILABLE);
            } else if (!moveToNextSplit(recordsWithSplitId, output)) {
                // 如果本次fetch的split已经全部被读取(本批没有更多的split),读取下一批数据
                // The fetch is done and we just discovered that and have not emitted anything, yet.
                // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
                // rather than emitting nothing and waiting for the caller to call us again.
                return pollNext(output);
            }
            // else fall through the loop
        }
    }
    

    getNextFetch方法获取下一批split。代码如下:

    @Nullable
    private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {
        // 检查fetcher是否有错误
        splitFetcherManager.checkErrors();
    
        LOG.trace("Getting next source data batch from queue");
        // elementsQueue中缓存了fetcher线程获取的split
        // 从这个队列中拿出一批split
        final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
        // 如果队列中没有数据,并且接下来这批split已被读取完毕,返回null
        if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
            // No element available, set to available later if needed.
            return null;
        }
    
        // 更新当前的fetch
        currentFetch = recordsWithSplitId;
        return recordsWithSplitId;
    }
    

    finishedOrAvailableLater方法检查后续是否还有数据,返回对应的状态。内容如下:

    private InputStatus finishedOrAvailableLater() {
        // 检查所有的fetcher是否都已关闭
        final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
        // 如果reader不会再接收更多的split,或者所有的fetcher都已关闭
        // 返回NOTHING_AVAILABLE
        if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        if (elementsQueue.isEmpty()) {
            // 如果缓存队列中没有数据,返回END_OF_INPUT
            // We may reach here because of exceptional split fetcher, check it.
            splitFetcherManager.checkErrors();
            return InputStatus.END_OF_INPUT;
        } else {
            // We can reach this case if we just processed all data from the queue and finished a
            // split,
            // and concurrently the fetcher finished another split, whose data is then in the queue.
            // 其他情况返回MORE_AVAILABLE
            return InputStatus.MORE_AVAILABLE;
        }
    }
    

    moveToNextSplit方法前往读取下一个split。

    private boolean moveToNextSplit(
        RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
        // 获取下一个split的ID
        final String nextSplitId = recordsWithSplitIds.nextSplit();
        if (nextSplitId == null) {
            // 如果没获取到,则当前获取过程结束
            LOG.trace("Current fetch is finished.");
            finishCurrentFetch(recordsWithSplitIds, output);
            return false;
        }
    
        // 获取当前split上下文
        // 它保存了split ID和split的状态
        currentSplitContext = splitStates.get(nextSplitId);
        checkState(currentSplitContext != null, "Have records for a split that was not registered");
        // 获取当前split对应的output
        currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);
        LOG.trace("Emitting records from fetch for split {}", nextSplitId);
        return true;
    }
    

    前面我们多次提到了fetcher,fetcher的作用是从拉取split缓存到SourceReader中。接下来我们分析fetcher拉取split的逻辑。

    @Override
    public void addSplits(List<SplitT> splits) {
        LOG.info("Adding split(s) to reader: {}", splits);
        // Initialize the state for each split.
        splits.forEach(
            s ->
            splitStates.put(
                s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
        // Hand over the splits to the split fetcher to start fetch.
        splitFetcherManager.addSplits(splits);
    }
    

    addSplits方法将fetch任务交给SplitFetcherManager处理,它的addSplits方法如下:

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        // 获取正在运行的fetcher
        SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
        if (fetcher == null) {
            // 如果没有,创建出一个fetcher
            fetcher = createSplitFetcher();
            // Add the splits to the fetchers.
            // 将这个创建出的fetcher加入到running fetcher集合中
            fetcher.addSplits(splitsToAdd);
            // 启动这个fetcher
            startFetcher(fetcher);
        } else {
            // 如果获取到了正在运行的fetcher,调用它的addSplits方法
            fetcher.addSplits(splitsToAdd);
        }
    }
    

    最后我们查看SplitFetcheraddSplits方法:

    public void addSplits(List<SplitT> splitsToAdd) {
        // 将任务包装成AddSplitTask,通过splitReader兼容不同格式数据的读取方式
        // 将封装好的任务加入到队列中
        enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
        // 唤醒fetcher任务,使用SplitReader读取数据
        // Split读取数据并缓存到elementQueue的逻辑位于FetcherTask,不再具体分析
        wakeUp(true);
    }
    

    FileEnumerator

    FileEnumerator接口用于发现用于读取的文件,并且将它们分割为FileSourceSplit。该接口只包含一个方法enumerateSplits

    FileEnumerator根据文件是否可拆分,分为如下两种实现类:

    • BlockSplittingRecursiveEnumerator:文件可拆分
    • NonSplittingRecursiveEnumerator:文件不可拆分

    我们看一下它们的enumeratorSplits方法。

    NonSplittingRecursiveEnumerator

    NonSplittingRecursiveEnumeratorenumerateSplits方法如下:

    @Override
    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
        throws IOException {
        final ArrayList<FileSourceSplit> splits = new ArrayList<>();
    
        for (Path path : paths) {
            final FileSystem fs = path.getFileSystem();
            final FileStatus status = fs.getFileStatus(path);
            // 遍历所有paths,将其中文件添加到splits集合中
            addSplitsForPath(status, fs, splits);
        }
    
        return splits;
    }
    
    private void addSplitsForPath(
        FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target)
        throws IOException {
        // 使用文件过滤器检查文件是否匹配
        if (!fileFilter.test(fileStatus.getPath())) {
            return;
        }
    
        // 如果file不是目录,将其转化为FileSourceSplit
        if (!fileStatus.isDir()) {
            convertToSourceSplits(fileStatus, fs, target);
            return;
        }
        // 到这一步说明是一个目录,需要递归遍历
        final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
        for (FileStatus containedStatus : containedFiles) {
            addSplitsForPath(containedStatus, fs, target);
        }
    }
    

    convertToSourceSplits方法将FileStatus转化为FileSourceSplit并加入到列表中。

    protected void convertToSourceSplits(
        final FileStatus file, final FileSystem fs, final List<FileSourceSplit> target)
        throws IOException {
        // 返回文件所在的host
        final String[] hosts =
            getHostsFromBlockLocations(fs.getFileBlockLocations(file, 0L, file.getLen()));
        // 添加FileSourceSplit到target列表中
        target.add(
            new FileSourceSplit(
                getNextId(),
                file.getPath(),
                0,
                file.getLen(),
                file.getModificationTime(),
                file.getLen(),
                hosts));
    }
    

    BlockSplittingRecursiveEnumerator

    BlockSplittingRecursiveEnumerator继承自NonSplittingRecursiveEnumerator。它覆盖了父类的convertToSourceSplits方法,如下所示:

    protected void convertToSourceSplits(
        final FileStatus file, final FileSystem fs, final List<FileSourceSplit> target)
        throws IOException {
    
        // 首先检查文件是否可以分片(通过后缀判断),如果不可以,调用父类的方法
        if (!isFileSplittable(file.getPath())) {
            super.convertToSourceSplits(file, fs, target);
            return;
        }
    
        // 到这里说明文件是可以分片的
        // 获取文件每个分片的位置
        final BlockLocation[] blocks = getBlockLocationsForFile(file, fs);
        if (blocks == null) {
            target.add(
                new FileSourceSplit(
                    getNextId(),
                    file.getPath(),
                    0L,
                    file.getLen(),
                    file.getModificationTime(),
                    file.getLen()));
        } else {
            for (BlockLocation block : blocks) {
                target.add(
                    new FileSourceSplit(
                        getNextId(),
                        file.getPath(),
                        block.getOffset(),
                        block.getLength(),
                        file.getModificationTime(),
                        file.getLen(),
                        block.getHosts()));
            }
        }
    }
    

    StaticFileSplitEnumerator

    它用于批模式读取有界的FileSource。读取配置的目录中所有的文件,将其分配给reader。它的startcloseaddreader方法是空实现。

    下面我们分析它的handleSplitRequest方法。该方法当接收到reader请求split的时候会被调用。

    @Override
    public void handleSplitRequest(int subtask, @Nullable String hostname) {
        // 如果reader会被注册,忽略这个请求
        if (!context.registeredReaders().containsKey(subtask)) {
            // reader failed between sending the request and now. skip this request.
            return;
        }
    
        if (LOG.isInfoEnabled()) {
            final String hostInfo =
                hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
            LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
        }
    
        // 从split分配器(后面分析),拿到一个期望分配给hostname的split
        final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
        if (nextSplit.isPresent()) {
            // 如果拿到了,分配这个split
            final FileSourceSplit split = nextSplit.get();
            context.assignSplit(split, subtask);
            LOG.info("Assigned split to subtask {} : {}", subtask, split);
        } else {
            // 如果没拿到,发送没有更多的split信号
            context.signalNoMoreSplits(subtask);
            LOG.info("No more splits available for subtask {}", subtask);
        }
    }
    

    ContinuousFileSplitEnumerator

    它用于流模式读取无界的FileSource,持续运行发现机制,周期性检测文件split然后分配给reader。

    start方法启动了一个定时发现任务,代码如下所示:

    @Override
    public void start() {
        // 定时调用fileEnumerator.enumerateSplits(paths, 1)方法
        // 返回结果交给processDiscoveredSplits方法处理
        // 初始延迟时间和间隔时间都为discoveryInterval
        context.callAsync(
            () -> enumerator.enumerateSplits(paths, 1),
            this::processDiscoveredSplits,
            discoveryInterval,
            discoveryInterval);
    }
    

    定时任务逻辑位于processDiscoveredSplits方法:

    private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error) {
        // 检查是否遇到error
        if (error != null) {
            LOG.error("Failed to enumerate files", error);
            return;
        }
    
        // 过滤掉已经处理过的path对应的split
        final Collection<FileSourceSplit> newSplits =
            splits.stream()
            .filter((split) -> pathsAlreadyProcessed.add(split.path()))
            .collect(Collectors.toList());
        // 为SplitAssigner添加可分配的split
        splitAssigner.addSplits(newSplits);
        // 指派split
        assignSplits();
    }
    

    接下来是指派split的assignSplits方法:

    private void assignSplits() {
        // 获取所有等待读取split的reader的iterator
        final Iterator<Map.Entry<Integer, String>> awaitingReader =
            readersAwaitingSplit.entrySet().iterator();
    
        while (awaitingReader.hasNext()) {
            // 遍历等待的reader
            final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
    
            // if the reader that requested another split has failed in the meantime, remove
            // it from the list of waiting readers
            // 忽略掉已经请求其他split,但是中途失败的reader
            if (!context.registeredReaders().containsKey(nextAwaiting.getKey())) {
                awaitingReader.remove();
                continue;
            }
    
            // 获取请求split reader所在hostname和subtask ID
            final String hostname = nextAwaiting.getValue();
            final int awaitingSubtask = nextAwaiting.getKey();
            // 获取下一个split
            final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
            if (nextSplit.isPresent()) {
                // 调用SplitEnumeratorContext的指派split方法
                context.assignSplit(nextSplit.get(), awaitingSubtask);
                // 从map中移除这个reader
                awaitingReader.remove();
            } else {
                break;
            }
        }
    }
    

    分配split的逻辑在SplitEnumeratorContext中,代码如下:

    default void assignSplit(SplitT split, int subtask) {
        assignSplits(new SplitsAssignment<>(split, subtask));
    }
    

    它调用的assignSplits方法位于SourceCoordinatorContext中:

    @Override
    public void assignSplits(SplitsAssignment<SplitT> assignment) {
        // Ensure the split assignment is done by the coordinator executor.
        // 确保下面代码在coordinator线程中执行
        callInCoordinatorThread(
            () -> {
                // Ensure all the subtasks in the assignment have registered.
                // 逐个检查这些subTask ID是否都被注册
                // operator发送ReaderRegistrationEvent事件用来注册reader,处理方法位于SourceCoordinator的handleEventFromOperator方法
                for (Integer subtaskId : assignment.assignment().keySet()) {
                    if (!registeredReaders.containsKey(subtaskId)) {
                        throw new IllegalArgumentException(
                            String.format(
                                "Cannot assign splits %s to subtask %d because the subtask is not registered.",
                                registeredReaders.get(subtaskId), subtaskId));
                    }
                }
    
                // 记录assigment到assignment追踪器
                assignmentTracker.recordSplitAssignment(assignment);
                assignment
                    .assignment()
                    .forEach(
                    (id, splits) -> {
                        // 遍历所有的assignment,通过SubtaskGateway向subtask发送AddSplitEvent
                        final OperatorCoordinator.SubtaskGateway gateway =
                            getGatewayAndCheckReady(id);
    
                        final AddSplitEvent<SplitT> addSplitEvent;
                        try {
                            addSplitEvent =
                                new AddSplitEvent<>(splits, splitSerializer);
                        } catch (IOException e) {
                            throw new FlinkRuntimeException(
                                "Failed to serialize splits.", e);
                        }
    
                        gateway.sendEvent(addSplitEvent);
                    });
                return null;
            },
            String.format("Failed to assign splits %s due to ", assignment));
    }
    

    我们回到ContinuousFileSplitEnumerator类,介绍它的handleSplitRequest方法。当SourceCoordinator收到Operator发来的RequestSplitEvent(请求split时间)时会调用这个方法。

    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        // 既然有reader发来请求split,说明这个reader在空闲等待,加入到等待的reader集合中
        readersAwaitingSplit.put(subtaskId, requesterHostname);
        //调用指派split方法,上面已经分析过了
        assignSplits();
    }
    

    FileSplitAssigner

    FileSplitAssigner负责决定哪个split需要被哪个节点处理。它确定了split处理的顺序和位置。接口包含3个方法:

    /**
     * Gets the next split.
     *
     * <p>When this method returns an empty {@code Optional}, then the set of splits is assumed to
     * be done and the source will finish once the readers finished their current splits.
     */
    // 获取下一个需要分配给hostname node的split
    Optional<FileSourceSplit> getNext(@Nullable String hostname);
    
    /**
     * Adds a set of splits to this assigner. This happens for example when some split processing
     * failed and the splits need to be re-added, or when new splits got discovered.
     */
    // 将待分配的split添加到SplitAssigner
    // 使用的场景为某些reader失败,则分配给它的split需要添加回SplitAssigner
    // 还有感知到了新的split需要处理的时候
    void addSplits(Collection<FileSourceSplit> splits);
    
    /** Gets the remaining splits that this assigner has pending. */
    // 获取所有还没有分配出来的split
    Collection<FileSourceSplit> remainingSplits();
    

    FileSplitAssigner有两个实现类:

    • SimpleSplitAssigner:随机顺序分配分片,不考虑位置。
    • LocalityAwareSplitAssigner:优先分配位于本地的分片。

    接下来我们分别分析它们的getNext方法。SimpleSplitAssignergetNext方法较为简单,从内部维护的splits列表中取出最后一个返回,如下所示:

    @Override
    public Optional<FileSourceSplit> getNext(String hostname) {
        final int size = splits.size();
        return size == 0 ? Optional.empty() : Optional.of(splits.remove(size - 1));
    }
    

    LocalityAwareSplitAssignergetNext方法:

    @Override
    public Optional<FileSourceSplit> getNext(@Nullable String host) {
        // for a null host, we always return a remote split
        // 如果参数host为null,永远分配给它远程的split
        if (StringUtils.isNullOrWhitespaceOnly(host)) {
            final Optional<FileSourceSplit> split = getRemoteSplit();
            if (split.isPresent()) {
                LOG.info("Assigning split to non-localized request: {}", split);
            }
            return split;
        }
        // 规范化hostname
        host = normalizeHostName(host);
    
        // for any non-null host, we take the list of non-null splits
        // 对于不是null的host,优先分配local split
        // 创建local split选择器,如果host和split中包含的来源host相同,则视为local split
        // 每一个host对应一个LocatableSplitChooser
        final LocatableSplitChooser localSplits =
            localPerHost.computeIfAbsent(
            host, (theHost) -> buildChooserForHost(theHost, unassigned));
    
        // localCount的含义为有多少host可以本地访问到这个split
        // 优先分配未分配的localCount最小的split
        final SplitWithInfo localSplit =
            localSplits.getNextUnassignedMinLocalCountSplit(unassigned);
        if (localSplit != null) {
            // 从未分配split列表中移除
            checkState(
                unassigned.remove(localSplit),
                "Selected split has already been assigned. This should not happen!");
            LOG.info(
                "Assigning local split to requesting host '{}': {}",
                host,
                localSplit.getSplit());
            // 本地分配次数+1
            localAssignments.inc();
            // 返回本地split
            return Optional.of(localSplit.getSplit());
        }
    
        // we did not find a local split, return a remote split
        // 到这里我们没有找到本地split,返回一个远程的split
        final Optional<FileSourceSplit> remoteSplit = getRemoteSplit();
        if (remoteSplit.isPresent()) {
            LOG.info("Assigning remote split to requesting host '{}': {}", host, remoteSplit);
        }
        return remoteSplit;
    }
    

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之新 Source 架构

        本文链接:https://www.haomeiwen.com/subject/imqslrtx.html