美文网首页
读Flink源码谈设计:FileSystemConnector中

读Flink源码谈设计:FileSystemConnector中

作者: 泊浮目 | 来源:发表于2022-03-08 18:01 被阅读0次

    本文首发于泊浮目的语雀:https://www.yuque.com/17sing

    版本 日期 备注
    1.0 2022.3.8 文章首发

    本文基于Flink 1.14代码进行分析。

    0.前言

    前阵子在生产上碰到了一个诡异现象:全量作业无法正常进行,日志中充斥着java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container xxxx(HOSTNAME:PORT) timed out的报错。

    场景为Oracle全量抽取至Hive,数据会流过Kafka,数据量为T级别,根据时间字段每天做一个分区。报错的Job负责抽取Kafka的数据并写至Hive,使用的是TableAPI。

    1.排查思路

    这个问题报到我这边的时候,有同学已经排查过一轮了。根据网上搜索,会告知你可能是yarn的压力过大、网络短暂不稳定等,可以调大heartbeat.timeout来缓解这个问题,经调整改问题并未解决。

    另外一个说法会告知你是GC频繁的原因。建议调整内存,调整后,的确有一定的效果(使出问题的时间变慢)。那很显然和代码有关系了。

    因为之前一个版本同步数据都没有出问题,因此开始寻找最近代码的改动,找了几圈下来并没有找到可疑的代码。顿时觉得有点头皮发麻。于是让现场的同学切换到上个版本继续做全量,现象依旧会发生。

    这时我就有点怀疑生产环境的特性了——比如数据特性,但现场的同学告知我数据并没有什么特殊之处。于是我要了一份现场的HeapDump,丢到了分析软件上进行查看,发现org.apache.flink.streaming.api.functions.sink.filesystem.Bucket的对象特别多。

    于是看了一下Bucket对象的定义:

    
    /**
     * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
     *
     * <p>For each incoming element in the {@code StreamingFileSink}, the user-specified {@link
     * BucketAssigner} is queried to see in which bucket this element should be written to.
     */
    @Internal
    public class Bucket<IN, BucketID> {
    

    好家伙。一个目录一个对象,此时此刻我已经对现场的同学告知我的“数据没有什么特殊之处”产生了怀疑,不过为了实锤,我还是跟了一遍代码:

    |-- HiveTableSink
       \-- createStreamSink
    |-- StreamingFileSink
      \-- initializeState
    |-- StreamingFileSinkHelper
      \-- constructor
    |-- HadoopPathBasedBulkFormatBuilder
      \-- createBuckets
    |-- Buckets
      \-- onElement
      \-- getOrCreateBucketForBucketId
    

    过了一遍代码以后,心里便有了数。问了下现场,同步的数据时间跨度是不是特别大,现场同学确认后,时间跨度为3年多。于是建议降低时间跨度,或者降低分区时间。最终将全量批次进行切分后解决了这个问题。

    2. 解决问题后的好奇

    如果每个目录都会产生一个Bucket,那如果运行一个流作业,岂不是迟早碰到相同的问题。这么显而易见的问题,社区的大神们肯定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:

        public void commitUpToCheckpoint(final long checkpointId) throws IOException {
            final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
                    activeBuckets.entrySet().iterator();
    
            LOG.info(
                    "Subtask {} received completion notification for checkpoint with id={}.",
                    subtaskIndex,
                    checkpointId);
    
            while (activeBucketIt.hasNext()) {
                final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
                bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
    
                if (!bucket.isActive()) {
                    // We've dealt with all the pending files and the writer for this bucket is not
                    // currently open.
                    // Therefore this bucket is currently inactive and we can remove it from our state.
                    activeBucketIt.remove();
                    notifyBucketInactive(bucket);
                }
            }
        }
    

    做Checkpoint后的提交时,这里会根据Bucket是否处于活跃状态来决定是否移除在内存中维护的数据结构。

    那么怎样才算活跃呢?代码很简短:

        boolean isActive() {
            return inProgressPart != null
                    || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty()
                    || !pendingFileRecoverablesPerCheckpoint.isEmpty();
        }
    

    接下来就是讲清楚这三个的触发条件了。

    2.1 inProgressPart == null

    该对象的类型为InProgressFileWriter,触发条件和FileSystem的滚动策略息息相关。

    
    /**
     * The policy based on which a {@code Bucket} in the {@code Filesystem Sink} rolls its currently
     * open part file and opens a new one.
     */
    @PublicEvolving
    public interface RollingPolicy<IN, BucketID> extends Serializable {
    
        /**
         * Determines if the in-progress part file for a bucket should roll on every checkpoint.
         *
         * @param partFileState the state of the currently open part file of the bucket.
         * @return {@code True} if the part file should roll, {@link false} otherwise.
         */
        boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;
    
        /**
         * Determines if the in-progress part file for a bucket should roll based on its current state,
         * e.g. its size.
         *
         * @param element the element being processed.
         * @param partFileState the state of the currently open part file of the bucket.
         * @return {@code True} if the part file should roll, {@link false} otherwise.
         */
        boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element)
                throws IOException;
    
        /**
         * Determines if the in-progress part file for a bucket should roll based on a time condition.
         *
         * @param partFileState the state of the currently open part file of the bucket.
         * @param currentTime the current processing time.
         * @return {@code True} if the part file should roll, {@link false} otherwise.
         */
        boolean shouldRollOnProcessingTime(
                final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;
    }
    
    

    这三个接口分别对应在某些情况下,是否应该关闭当前打开的文件:

    • shouldRollOnCheckpoint:做Checkpoint之前检查。
    • shouldRollOnEvent:根据当前的状态检查是否应该关闭。比如当前的buffer大小是否超过了限制。
    • shouldRollOnProcessingTime:检查当前打开时间是否太长来盘判断符合关闭的条件。

    2.2 pendingFileRecoverablesForCurrentCheckpoint isNotEmpty

    其中的元素也是根据RollingPolicy来触发的,不做过多的解释。

    2.3 pendingFileRecoverablesPerCheckpoint isNotEmpty

    基于pendingFileRecoverablesForCurrentCheckpoint isNotEmpty。用字典来保存一个CheckpointId与List<InProgressFileWriter.PendingFileRecoverable>的关系。

    2.4 非活跃Bucket

    结合前面的条件来说,其实就是已经关闭并做完所有Checkpoint的目录,则为非活跃Bucket。检查的时机一般是:

    1. Task重新恢复时,从StateBackend中读取之前的状态,并做检查
    2. 做完Checkpoint后,会进行一次检查

    当Bucket变成非活跃状态时,会做一次通知Inactive的通知。告知下游该分区的数据已提交,变成可读状态。见issue:artition commit is delayed when records keep coming

    3. FileSystemConnector中的整洁架构

    在了解完上文的知识点后,我关注到了有这么一个Proposal:FLIP-115: Filesystem connector in Table。根据这个Proposal,我简单的翻阅了一下相关的源码,发现其实现也是一种整洁架构的体现。

    在上面我们已经进行过源码分析了,接下来我们就里面的抽象设计以及职责、分层进行分析:

    |-- HiveTableSink  #Table级API,负责对外,用户可以直接调用
    |-- StreamingFileSink  #Streaming 级API,也可以对外,位于TableAPI下方
    |-- StreamingFileSinkHelper #集成了对于TimeService的逻辑,便于定期关闭Bucket;以及对于数据到Bucket的分发。这个类也被AbstractStreamingWriter使用,注释上也建议复用于 RichSinkFunction or StreamOperator
    |-- BucketsBuilder #场景中调到的具体类是HadoopPathBasedBulkFormatBuilder,这个类会关注Buckets的具体实现以BucketWriter的具体实现
    |-- Buckets #这是一个管理Bucket生命周期的类。其中有几个关键成员对象
      |-- BucketWriter  #会对应具体的FileSystem实现与写入的Format
      |-- RolingPolicy  #滚动策略,前面提到过,不再深入讨论
      |-- BucketAssigner #决定每个元素输出到哪个Bucket中。比如是key还是date等等
      |-- BucketFactory #负责每个Bucket的创建
    

    由于职责切分粒度较细,数据的流转逻辑与外部具体实现是解耦的,我们举几个例子:

    1. 如果我们要基于自己的DSL来调用Hive的写入,那么只需要写个和HiveTableSink类似的HiveDSLSink。
    2. 如果一个数仓(数据湖)一直在增加自己底层的文件系统的支持,那么当第一套代码构筑完毕时,后续只需要实现相应的BucketWriterFileSystem即可。
    3. 如果一个数仓(数据湖)一直在增加自己支持的Format,那么当第一套代码构筑完毕时,后续只需要实现相应的BucketWriter即可。

    基于这种设计,核心逻辑往往不会产生变化,并将容易变化的部分隔离开来,整个模块的质量将更容易得到保障。

    相关文章

      网友评论

          本文标题:读Flink源码谈设计:FileSystemConnector中

          本文链接:https://www.haomeiwen.com/subject/imoxkrtx.html