美文网首页程序员
Flink1.10-BucketingSink源码解析

Flink1.10-BucketingSink源码解析

作者: 小胡子哥灬 | 来源:发表于2020-05-16 12:16 被阅读0次

BucketingSink是Flink连接外部文件系统的Sink的实现,支持Hadoop文件系统支持的所有文件系统,提供了多种文件滚的策略,支持exactly-once语义。虽然在新版本中已经过期,但它仍然有源码阅读的价值。

前言

首先来看 BucketingSink都实现了哪些类:

public class BucketingSink<T>
        extends RichSinkFunction<T>
        implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback
  • 继承了RichSinkFunction
  • 实现了InputTypeConfigurable,CheckpointedFunction,CheckpointListener, ProcessingTimeCallback

InputTypeConfigurable就比较简单,设置输入元素的类型。其它我会对每一个类的实现做解析:

RichSinkFunction::invoke()

invokeBucketingSink的入口方法,当收到数据时调用此方法,下面是它的实现:

public void invoke(T value) throws Exception {
        // 得到数据输出的路径
        Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
        //获取当前的ProcessTime
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime();

        // 1. 从state中得到BucketState
        BucketState<T> bucketState = state.getBucketState(bucketPath);
        if (bucketState == null) {
            // 如果状态为空(也就是处理inprogress的bucket),则新建一个实例
            bucketState = new BucketState<>(currentProcessingTime);
            //加入到state中
            state.addBucketState(bucketPath, bucketState);
        }
        // 2. 判断是否可以关闭(滚动)现在的bucket。简单来说就是判断是否还有处理inprogress的bucket,并且是否可以关闭
        if (shouldRoll(bucketState, currentProcessingTime)) {
            // 3. 如果可以则新建一个inprogress的文件
            openNewPartFile(bucketPath, bucketState);
        }
        //数据写入bucket中
        bucketState.writer.write(value);
        //更新最后一次的写入时间
        bucketState.lastWrittenToTime = currentProcessingTime;
}

代码逻辑都比较简单明了,最主要的是理解里面的一些类的结构和一些方法: BucketState的结构, shoudRoll和openNewPartFile方法的实现。

  • BucketState - 用于记录当前处理inprogress的文件,以及当checkpoint完成时,由in-progress变为pendding状态的文件。下面是主要的类的属性,
     static final class BucketState<T> implements Serializable {
        private static final long serialVersionUID = 1L;

        /**
         * 当前正在处理的文件(in-progress)
         */
        String currentFile;
        /**
         * 上一个checkpoing的in-progress文件的有效长度
         */
        long currentFileValidLength = -1;
        /**
         * bucket上一次写入数据的时间
         */
        long lastWrittenToTime;
        /**
         * 文件创建的时间
         */
        long creationTime;
        /**
         * 关闭Bucket时生成的pending file
         */
        List<String> pendingFiles = new ArrayList<>();
        /**
         * snapshotState时,把pendinbgFile加入到此map,每个checkpointId对应的 pending file
         */
        final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
        ......
  • shouldRoll() - 判断是否应该关闭现在的bucket(in-progress),满足以下三个条件之一:
    1. 还没有文件被创建过,这里不能单纯的理解为第一条数据来的时候。应该理解为路径下还没有In-progress文件。
    2. 当前的文件已经达到最大大小的配置(默认384M)
    3. 达到了最大的文件滚动时间
private boolean shouldRoll(BucketState<T> bucketState, long currentProcessingTime) throws IOException {
        boolean shouldRoll = false;
        int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        if (!bucketState.isWriterOpen) {
            // 满足第一个条件,还没有in-progress的文件,一般是启动任务或者,上一个in-progress文件被关闭后
            shouldRoll = true;
            LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex);
        } else {
            //获取当前文件的大小,如果大于则返回true
            long writePosition = bucketState.writer.getPos();
            if (writePosition > batchSize) {
                shouldRoll = true;
                LOG.debug(
                    "BucketingSink {} starting new bucket because file position {} is above batch size {}.",
                    subtaskIndex,
                    writePosition,
                    batchSize);
            } else {
                // 当前处理时间减去文件创建时间,大于设置的最大滚动时间,返回true
                if (currentProcessingTime - bucketState.creationTime > batchRolloverInterval) {
                    shouldRoll = true;
                    LOG.debug(
                        "BucketingSink {} starting new bucket because file is older than roll over interval {}.",
                        subtaskIndex,
                        batchRolloverInterval);
                }
            }
        }
        return shouldRoll;
    }
  • openNewPartFile() - 关闭当前(in-progress)文件,并创建一个新的in-progress文件
private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
        //关闭当前(in-progress)的文件,并且把.in-progress的文件改名成.pending后缀的文件
        closeCurrentPartFile(bucketState);
        if (!fs.exists(bucketPath)) {
            try {
                if (fs.mkdirs(bucketPath)) {
                    LOG.debug("Created new bucket directory: {}", bucketPath);
                }
            } catch (IOException e) {
                throw new RuntimeException("Could not create new bucket path.", e);
            }
        }
        /**
         * 下面代码块的逻辑是按升序获取partCounter值,直到达到尚未使用的最小值。
         * 因为不同的并行子任务是单线程的,所以这里不会有并发问题。所以我们可以看到文件名称为
         * part-1-0,part-1-1=》第一个数字是subtaskIndex,第二个数字是partCounter
         */
        int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); // 获取子任务的index
        //得到文件名 String.format("%s-%s-%s%s", partPrefix, subtaskIndex, partCounter, partSuffix)
        Path partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter);
        // 循环检查路径下是否已经存在同名文件,或者同名pending的文件前缀,同名in-progress文件前缀
        while (fs.exists(partPath) ||
                fs.exists(getPendingPathFor(partPath)) ||
                fs.exists(getInProgressPathFor(partPath))) {
            bucketState.partCounter++;
            partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter);
        }
        // 记录文件创建时间
        bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
        // 再次加1,因为上面的while循环如果不成立是不加1的,下一次进来的时候还得从头开始
        bucketState.partCounter++;

        LOG.debug("Next part path is {}", partPath.toString());
        bucketState.currentFile = partPath.toString();
        // 加上.in-progress的后缀
        Path inProgressPath = getInProgressPathFor(partPath);
        // 设置writer,默认用StringWriter
        if (bucketState.writer == null) {
            //每个并行子任务,使用不同的Writer实例。要重新new
            bucketState.writer = writerTemplate.duplicate();
            if (bucketState.writer == null) {
                throw new UnsupportedOperationException(
                    "Could not duplicate writer. " +
                        "Class '" + writerTemplate.getClass().getCanonicalName() + "' must implement the 'Writer.duplicate()' method."
                );
            }
        }
        // 打开文件
        bucketState.writer.open(fs, inProgressPath);
        bucketState.isWriterOpen = true;
    }

closeCurrentPartFile方法比较简单,就不用多说。openNewPartFile方法的逻辑也比较清晰,首先关闭当前处理的文件,并且重命名为pending文件。然后创建新的progress文件,打开bucket。
总结下invoke的流程:
1.获取当前子任务当前分区路径下的文件状态
2.判断当前的state是否可以关闭。
3.如果满足关闭条件,则先关闭,再新建一个bucket.
4.最后写入数据到bucket.

CheckpointedFunction

接着看BucketingSink的状态管理,BucketingSink使用的是ListState,里面主要管理的是当处理的文件的各种信息,包括当前的inprogress文件,pending文件列表,文件创建时间等信息,具体内容可以结合上面的BucketState类观看阅读。

  • initializeState() - 初始化状态,任务启动时,从checkpoint恢复状态
public void initializeState(FunctionInitializationContext context) throws Exception {
        Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
        //初始化FileSystem,简单的理解就是像连接Mysql一样,初始化连接
        try {
            initFileSystem();
        } catch (IOException e) {
            LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
            throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
        }
        //反射获取truncate方法,如果hadoop版本没有truncate方法,从checkpoint恢复时,用最近成功一次的checkpoint创建一个.valid-length的文件
        if (this.refTruncate == null) {
            this.refTruncate = reflectTruncate(fs);
        }
        OperatorStateStore stateStore = context.getOperatorStateStore();
        restoredBucketStates = stateStore.getSerializableListState("bucket-states");
        int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        if (context.isRestored()) {
            LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
            for (State<T> recoveredState : restoredBucketStates.get()) {
                /*
                * 1.peding状态的文件全部恢复成part file
                * 2.in-progress的文件全部恢复成part file,如果有truncate方法,则恢复的文件则是不包含失效的部分,如果没有,则新建一个同名的.valid-length,
                * 但是,使用时,需要自己处理,只读取有效的部分。
                 */
                handleRestoredBucketState(recoveredState);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
                }
            }
        } else {
            LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
        }
    }

比较重要的是handleRestoredBucketState方法,它做了这样几件事情
1. 清除掉pendingFiles的list,因为pendingFiles的list只是关闭文件时由progress重命名而来,它会在执行checkpoint时,放到pendingFile对应的map中,所以只需要恢复map中的pendingFile.
2. 处理in-progress文件:首先判断是否有同名的pending和in-progress文件,如果存在, 则rename成partfile文件,接着是恢复上一次完成的checkpoint,如果支持truncate方法,则直接把多余的截取掉,但是如果不支持truncate方法,则新建一个同名的.valid-length文件,里面记录了有效的长度,后续读取这个文件时需要自己处理只读取有效的部分。
我还是举个例子来说明这里吧:

  1. 假如part-1.0.in-progress文件里已经写入了以下发条数据,此时做了checkpoint,那么成功后state里的有效长度是: 25
flink is nice
java is nice
  1. chekpoint后,又来了一条数据,此时的有效长度是39,但是还没有做checkpoint,系统就挂了此时part-1-0.in-progress文件里有三条数据。
scala is nice
  1. 从checkpoint恢复时,会先把part-1-0.in-progress文件变成part-1-0,里面依然有三条数据,但是最近成功的checkpoint里只承认两条是有效的,也就是valid-length是25。如果hadoop支持truncate方法,则直接会把part-1-0里的第三条数据截取的,只保留有效的数据
flink is nice
java is nice

反之,如果 hadoop 不支持truncate方法,则新建一个part-1-0.valid-length文件,里面存入有效长度值25,此时part-1-0文件还是三条数据

flink is nice
java is nice
scala is nice

所以这需要后续读取这个文件时,先读取part-1-0.valid-length文件,拿到有效长度,再从part-1-0文件里拿到有效数据。

  • snapshotState() - 保存当前子任务的快照,执行checkpoint,注意这里执行后变不是立马成功,要notifyCheckpointComplete之后才算成功。先上代码:
public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
        //先先掉上一次的state
        restoredBucketStates.clear();
        synchronized (state.bucketStates) {
            int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
            //遍历bucketStates
            for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
                BucketState<T> bucketState = bucketStateEntry.getValue();
                // 如果文件还没有关闭,要再取一次有效长度,确保是最新的有效长度
                if (bucketState.isWriterOpen) {
                    bucketState.currentFileValidLength = bucketState.writer.flush();
                }
                synchronized (bucketState.pendingFilesPerCheckpoint) {
                    // 把pending文件放入pendingFilesPerCheckpoint的map中,
                    // key是checkpointId,value是pending文件列表(因为如果checkpoint时间设置过长,有可能有多个pending文件)
                    // 等待notifyCheckpointComplete之后,才把里面的文件变成partfile
                    bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
                }
                bucketState.pendingFiles = new ArrayList<>();
            }
            restoredBucketStates.add(state);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
            }
        }
    }

snapshotState的逻辑比较简单,简单来说就是保存状态,然后把pending文件按checkpointId存入,等待notifyCheckpointComplete后,再把pending文件变成partfile文件。

CheckpointListener

上面提到的snapshotState后,等待notifyCheckpointComplete通知完成,把pending文件变成partfile。

  • notifyCheckpointComplete() - 通知checkpoint执行完成,把pending文件变成partfile文件,整个流程结束
public void notifyCheckpointComplete(long checkpointId) throws Exception {
        synchronized (state.bucketStates) {
            //遍历bucketStates
            Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = state.bucketStates.entrySet().iterator();
            while (bucketStatesIt.hasNext()) {
                BucketState<T> bucketState = bucketStatesIt.next().getValue();
                synchronized (bucketState.pendingFilesPerCheckpoint) {
                    Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt =
                        bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
                    // 再遍历pendingFilesPerCheckpoint,
                    while (pendingCheckpointsIt.hasNext()) {
                        Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next();
                        //拿到checkpointId
                        Long pastCheckpointId = entry.getKey();
                        List<String> pendingPaths = entry.getValue();
                        // 和当前完成的checkpointId比较,小于等于则可以处理
                        if (pastCheckpointId <= checkpointId) {
                            LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
                            for (String filename : pendingPaths) {
                                Path finalPath = new Path(filename);
                                Path pendingPath = getPendingPathFor(finalPath);
                                fs.rename(pendingPath, finalPath);
                                LOG.debug(
                                    "Moving pending file {} to final location having completed checkpoint {}.",
                                    pendingPath,
                                    pastCheckpointId);
                            }
                            //用迭代器,确保删除不报错
                            pendingCheckpointsIt.remove();
                        }
                    }
                    // 这里的场景暂时没想到,它这里说是Writer被关闭,pendingFiles却是空的。但是pendingFiles为空
                    //就说明in-progress的文件没有被关闭,就说是Writer就不会被关闭,所以这里没弄明白。
                    if (!bucketState.isWriterOpen &&
                        bucketState.pendingFiles.isEmpty() &&
                        bucketState.pendingFilesPerCheckpoint.isEmpty()) {

                        // We've dealt with all the pending files and the writer for this bucket is not currently open.
                        // Therefore this bucket is currently inactive and we can remove it from our state.
                        bucketStatesIt.remove();
                    }
                }
            }
        }
    }

逻辑都写在代码里了,这里就不再多说。

ProcessingTimeCallback

最后再来说 ProcessingTimeCallback的onProcessingTime方法,前面我们提到关闭文件要满足三个条件之一:

  1. 还没有文件被创建过,这里不能单纯的理解为第一条数据来的时候。应该理解为路径下还没有In-progress文件。
  2. 当前的文件已经达到最大大小的配置(默认384M)
  3. 达到了最大的文件滚动时间

那么onProcessingTime就是第4条件,它会定期检查in-progress的Bucket,当发现文件处理非活跃状态的时间达到配置的时间后,就会关闭这个文件。

public void onProcessingTime(long timestamp) throws Exception {
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
              // 定时关闭文件
        closePartFilesByTime(currentProcessingTime);
          //注册下一次的Timer
        processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
    }

private void closePartFilesByTime(long currentProcessingTime) throws Exception {
        synchronized (state.bucketStates) {
            for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
                // 两个条件:
                // 1. 当前时间减去配置的非活跃阈值大于 最后一次数据写入时间
                // 2. 当前时间减去最大滚动阈值大于文件创建时间(这个和前面的条件一样,放在这里是因为这里不需要数据来驱动。)
                if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)
                        || (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {
                    LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
                        getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
                    closeCurrentPartFile(entry.getValue());
                }
            }
        }
    }

总结

以上就是BucketingSink的源码解析,总的来说,阅读BucketingSink的源码还是能学到很多东西的,比如Flink的state用法以及于如何运用Flink的Checkpoint机制来保证exactly-once,但是它还是有个弊端就是,它的状态管理用的是ListState, 也必须是ListState(因为你无法保证用户在使用BucketingSink是key by的,要通用必须用ListState),而文件的命名方式借助于subTaskIndex,所以当更改并行度时,数据不能正常恢复。
后续我本来打算再写一个StreamingFileSink的源码解析的,但是我发现StreamingFileSink的总体思想和BucketingSink的思想是差不多的,只是StreamingFileSink的实现比较灵活,对各个模块组件化了(代码很绕),代码也很容易看懂,但是很费时间,也比较容易忘记,个人觉得StreamingFileSink的代码阅读价值并不高于BucketingSink,所以我就没有写StreamingFileSink的源码解析。在这里就简单比较下两者的差别吧。

  1. BucketingSink和StreamingFileSink的state管理都使用了ListState,不同在于StreamingFileSink存的是序列化后的对象(byte),所以StreamingFileSink在checkpoint对网络宽带占用较少,但checkpoint时,耗时可能会比BucketingSink多。还有StreamingFileSink的state还多了partCounter的计数,这个很重要,这样就不用每次遍历目录下的文件去拿partCounter了,直接从state里取。
  2. 从Checkpoint恢复时,两者对in-progress文件的rollback也有所不同,BucketingSink是兼容了hadoop版本对truncate方法的支持,而StreamingFileSink只适用于hadoop2.7或者以上的版本,也就是说,StreamingFileSink不会有.valid-length的文件了。
  3. StreamingFileSink的滚动策略更丰富,用户也可以自己实现自己的滚动策略, 所以StreamingFileSink更灵活。

以上3点是BucketingSink和StreamingFileSink比较明显的区别。我个人觉得,在使用上,如果你的hadoop是2.7或者以上的版本,我推荐用StreamingFileSink,反之使用BucketingSink; 在代码阅读学习上,比较推荐阅读BucketingSink,因为思想是差不多,而且阅读起来比StreamingFileSink轻松,代码逻辑更加清晰。

相关文章

网友评论

    本文标题:Flink1.10-BucketingSink源码解析

    本文链接:https://www.haomeiwen.com/subject/onshohtx.html