Flink Streaming File Sink

作者: todd5167 | 来源:发表于2021-08-13 16:33 被阅读0次

    背景

    Flink 支持将流数据以文件形式存储到外部系统,典型使用场景是将数据写入Hive表所在 HDFS存储路径,通过Hive 做查询分析。随着Flink文件写入被业务广泛使用,暴露出很多问题,因此需要了解 Flink Streaming File sink 的实现逻辑。

    案例

    从Kafka消费JSON数据,转换为 UserInfo 实体类数据流,最终以Parquet 格式写入Hive表对应的HDFS路径。使用 Flink 1.12.1,Hadoop 2.8.0, hive 2.3.8。

    -----------------------------
    - hive 建表语句
    -----------------------------
    
    create table userinfo(
        userid int,
        username string
    ) stored as parquet;
    
    -----------------------------
    - java 实体类
    -----------------------------
    public class UserInfo {
        private int userId;
        private String userName;
    
        public int getUserId() {
            return userId;
        }
    
        public void setUserId(int userId) {
            this.userId = userId;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    
    
        @Override
        public String toString() {
            return "UserInfo{" +
                    "userId=" + userId +
                    ", userName='" + userName + '\'' +
                    '}';
        }
    }
    
    -----------------------------
    - Flink 文件写入程序
    -----------------------------
    public class Kafka2Parquet {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            props.setProperty("group.id", "test_001");
    
            env.setParallelism(1);
            env.enableCheckpointing(30000);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
            env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/user/todd/checkpoint"));
    
            FlinkKafkaConsumer<String> dataStream = new FlinkKafkaConsumer("mqTest02", new SimpleStringSchema(), props);
            dataStream.setStartFromLatest();
    
            DataStream<UserInfo> userInfoDataStream = env.addSource(dataStream)
                    .map(value -> JsonUtils.parseJson(value, UserInfo.class));
            
            // 1. 设置BulkFormat Builder  2.使用 CheckpointRollingPolicy
            StreamingFileSink<UserInfo> parquetSink = StreamingFileSink
                    .forBulkFormat(new Path("hdfs://localhost:9000/user/hive/warehouse/userinfo"),  ParquetAvroWriters.forReflectRecord(UserInfo.class))
                    .withRollingPolicy(OnCheckpointRollingPolicy.build())
                    .build();
    
            userInfoDataStream.addSink(parquetSink);
    
            env.execute();
        }
    }
    
    -----------------------------
    - 生成的parquet 文件名称及路径
    -----------------------------
    inprogress 临时文件
    hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/.part-0-0.inprogress.18296793-9fde-4376-b6fc-7c47512bd108
    
    part 最终文件
    hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/part-0-0
    
    
    

    核心类

    1. BulkWriter:用于不同格式的数据文件批量写入,主要实现类ParquetBulkWriter、AvroBulkWriter、OrcBulkWriter、SequenceFileWriter代表了数据写入的压缩格式。
    2. RecoverableWriter: 具有失败恢复能力的外部文件系统写入器,主要实现类HadoopRecoverableWriter、S3RecoverableWriter、LocalRecoverableWriter,代表对不同类型文件系统的操作。
    3. BulkPartWriter:InProgressFileWriter 实现类用来向inprogress文件写数据,持有BulkWriter。
    4. OutputStreamBasedPartFileWriter:Part Writer 基类,使用 RecoverableFsDataOutputStream 写出数据。
    5. RecoverableFsDataOutputStream:文件系统输出流,能够从文件系统的指定偏移量进行数据写入。
    6. BucketAssigner:负责将数据划分到不同的Bucket,可以根据数据格式自定义Assigner。内部集成 SimpleVersionedSerializer,用来对BucketID 做序列化/反序列化操作。
      a. BucketAssigner 子类 DateTimeBucketAssigner 根据数据的ProcessTime 生成 yyyy-MM-dd--HH 格式的 Bucket 名称。同时使用 SimpleVersionedStringSerializer 对Bucket 名称序列化。
    7. Bucket:StreamingFileSink数据输出的目录,每一条处理的数据根据BucketAssigner被分配到某个Bucket。主要功能:
      1. 维护一份 InProgressFile 文件,负责该文件的创建、数据写入、提交写入。
      2. 在 StreamingFileSink 执行Checkpoint时,负责构建 BucketState 进而进行该状态序列化 。
      3. 在 StreamingFileSink Checkpoint 完成后,重命名 InProgressFile 文件,
      4. 在 StreamingFileSink 从savepoint 启动时,从 BucketState 恢复 InProgressFile 相关信息。
    8. BucketState: Bucket的状态信息。通过BucketState能够恢复Bucket inprogress 文件及当前写入偏移量,从而继续向该 inprogress文件中追加内容,同时能够恢复 Pending状态文件信息,从而继续执行后续重名逻辑。
    9. Buckets:负责管理 StreamingFileSink 中所有活跃状态的 Bucket。包括数据所在Bucket 分配,Active Bucket 快照状态存储。
    10. RollingPolicy:定义了buckt 生成新的in-progress文件、及将in-progress 文件变更为最终part文件的策略。 最常用的策略是CheckpointRollingPolicy,在每次Checkpoint完成时,根据in-progress文件生成part文件。
    11. StreamingFileSinkHelper:StreamingFileSink 调用 StreamingFileSinkHelper 方法完成对Buckets数据的写入及状态存储。
    12. StreamingFileSink:根据BucketsBuilder构造器创建Buckets,初始化时创建StreamingFileSinkHelper,在Sink、checkpoint 方法中调用 StreamingFileSinkHelper 接口。

    数据写入

    Flink写文件流程为,先将数据写入inprogress临时文件,在满足RollingPolicy时,将inprogress临时文件重命名为最终的part文件。
    参考Flink1.12.1版本的代码,学习下 Flink 将数据写入文件的具体流程。


    Flink 文件写入.png
    1. StreamingFileSink 执行 invoke() 方法处理数据,是通过调用 StreamingFileSinkHelper onElement()方法对 Buckets 进行操作。
    functions.sink.filesystem.Buckets#onElement
    
    public Bucket<IN, BucketID> onElement(
            final IN value,
            final long currentProcessingTime,
            @Nullable final Long elementTimestamp,
            final long currentWatermark)
            throws Exception {
        // note: 获取当前数据所在的 BucketID, 即被被分桶后的子文件夹名称
        final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
        // note: 从已缓存的集合中获取Bucket 或者 新建Bucket并缓存
        final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
        // note: 将数据写入具体 bucket
        bucket.write(value, currentProcessingTime);
        return bucket;
    }
    
    1. Buckets 处理数据时,需要根据定义的 BucketAssigner 获取数据所在的 Bucket 标识。上述案例使用了 DateTimeBucketAssigner 了解下它如何根据ProcessTime 获取 BucketID。
    bucketassigners.DateTimeBucketAssigner#getBucketId
    
    public String getBucketId(IN element, BucketAssigner.Context context) {
        if (dateTimeFormatter == null) {
            // note: 将Processing Time 转换为 yyyy-MM-dd--HH 格式
            dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
        }
        return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
    }
    
    1. 根据 BucketID 从Buckets 中拿到有效的Bucket。
    filesystem.Buckets#getOrCreateBucketForBucketId  
    
    private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId)
            throws IOException {
        Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
        if (bucket == null) {
            // note: 构建buckt所在完整路径,例如 hdfs://localhost:9000/user/hive/warehouse/userinfo/2020-08-08--14
            final Path bucketPath = assembleBucketPath(bucketId);
            // note: 创建 Bucket 并由activeBuckets缓存
            bucket =
                    bucketFactory.getNewBucket(
                            subtaskIndex,
                            bucketId,
                            bucketPath,
                            maxPartCounter,
                            bucketWriter,
                            rollingPolicy,
                            fileLifeCycleListener,
                            outputFileConfig);
            activeBuckets.put(bucketId, bucket);
    
            notifyBucketCreate(bucket);
        }
        return bucket;
    }
    
    1. 初次向Bucket写入数据,需要创建part的临时文件及用来向文件写数据的InProgressFileWriter对象,同时创建BulkWriter,用来进行数据写入。
      1. 创建临时文件:根据inprocess文件生成规则,在HadoopRecoverableFsDataOutputStream初始化时创建并返回针对该文件的DataOutputStream。
      2. 创建BulkWriter:当前案例中由ParquetWriterFactory工厂类创建ParquetBulkWriter,并传递临时文件对应的DataOutputStream。
    sinkfilesystem.Bucket#write 
    void write(IN element, long currentTime) throws IOException {
        if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
            inProgressPart = rollPartFile(currentTime);
        }
        //note: BulkPartWriter使用 BulkWriter 写入数据。
        inProgressPart.write(element, currentTime);
    }
    
    private InProgressFileWriter<IN, BucketID> rollPartFile(final long currentTime)
            throws IOException {
        // note: 关闭part文件。
        closePartFile();
        final Path partFilePath = assembleNewPartPath();
        // note: 创建InProgressFileWriter
        return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
    }
    
    // 创建BulkPartWriter 使用BulkWriter进行数据写入,
    OutputStreamBasedBucketWriter#openNewInProgressFile
    public InProgressFileWriter<IN, BucketID> openNewInProgressFile(
            final BucketID bucketID, final Path path, final long creationTime)
            throws IOException {
        // note: 根据的inprocess文件路径,由recoverableWriter创建。
        return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
    }
    public InProgressFileWriter<IN, BucketID> openNew(
             final BucketID bucketId,
             final RecoverableFsDataOutputStream stream,
             final Path path,
             final long creationTime)
             throws IOException {
    
         final BulkWriter<IN> writer = writerFactory.create(stream);
         return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
     }
    }
    
    // 创建hdfs文件系统临时文件,针对该文件创建 RecoverableFsDataOutputStream
    HadoopRecoverableWriter#open
    public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
       final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath);
       final org.apache.hadoop.fs.Path tempFile = generateStagingTempFilePath(fs, targetFile);
       return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);
    }
    
    HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
            throws IOException {
        // note: 确保hadoop 支持 truncate方法
        ensureTruncateInitialized();
    
        this.fs = checkNotNull(fs);
        this.targetFile = checkNotNull(targetFile);
        this.tempFile = checkNotNull(tempFile);
        // 创建临时文件
        this.out = fs.create(tempFile);
    }
    
    1. 将数据以parquet格式写入临时文件,调用链路比较长中间涉及不少工厂类及代理类,最终调用的还是parquet框架本身的API。
    filesystem.BulkPartWriter#write
    public void write(IN element, long currentTime) throws IOException {
       writer.addElement(element);
       markWrite(currentTime);
    }
    
    parquet.ParquetBulkWriter#addElement
    public void addElement(T datum) throws IOException {
       // note: org.apache.parquet.hadoop.ParquetWriter
       parquetWriter.write(datum); 
    }
    

    checkpoint 过程

    在生产环境中大多使用 OnCheckpointRollingPolicy 策略,即在执行Checkpoint时存储BucketState,提交已写入的数据记录已写入数据的偏移量,在CK完成后将 inprogress 文件重命名为最终 part 文件。
    根据Checkpoint生命周期方法,了解执行过程。


    StreamingFileSink 处理流程
    1. initializeState 创建StreamingFileSinkHelper,做一些初始化工作。如果从已有的状态快照启动,会对BucketStates进行恢复,稍后详细介绍快照恢复的逻辑,先看状态快照中存储了什么信息,及后续逻辑。
    2. snapshotState 状态快照存储。Buckets 的 snapshotState() 会保存序列化后的 BucketState 及当前子任务处理的最大part文件个数。
    public void snapshotState(
        final long checkpointId,
        final ListState<byte[]> bucketStatesContainer,
        final ListState<Long> partCounterStateContainer)
        throws Exception {
    // note: 清理历史状态信息
    bucketStatesContainer.clear();
    partCounterStateContainer.clear();
        
    // note: 将 BucketState 以二进制格式存储到 bucketStatesContainer
    snapshotActiveBuckets(checkpointId, bucketStatesContainer);
    // note: 存储当前任务处理的最大文件数    
    partCounterStateContainer.add(maxPartCounter);
    }
    
    private void snapshotActiveBuckets(
            final long checkpointId, final ListState<byte[]> bucketStatesContainer)
            throws Exception {
        for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
            //note: 每个正在使用的Bucket会生成BucketState
            final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);
            // note: 将BucketState 序列化后存储到 ListState
            final byte[] serializedBucketState =
                    SimpleVersionedSerialization.writeVersionAndSerialize(
                            bucketStateSerializer, bucketState);
    
            bucketStatesContainer.add(serializedBucketState);
        }
    }
    
    BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
        //note: 关闭 inProgressPart, 填充BucketState 使用的属性信息
        prepareBucketForCheckpointing(checkpointId);
        
        //note: ck 期间有数据写入。
        if (inProgressPart != null) {
            inProgressFileRecoverable = inProgressPart.persist();
            inProgressFileCreationTime = inProgressPart.getCreationTime();
            this.inProgressFileRecoverablesPerCheckpoint.put(
                    checkpointId, inProgressFileRecoverable);
        }
        // note: 构建出BucketState
        return new BucketState<>(
                bucketId,
                bucketPath,
                inProgressFileCreationTime,
                inProgressFileRecoverable,
                pendingFileRecoverablesPerCheckpoint);
    }
    
    private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
        if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
            closePartFile();
        }
        //  note: closePartFile()会将生成的 pendingFileRecoverable 写入pendingFileRecoverablesForCurrentCheckpoint
        if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
            pendingFileRecoverablesPerCheckpoint.put(
                    checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
            pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
        }
    }
    

    closePartFile() 主要处理工作包含:

    1. 使用 BulkWriter 将数据 flush 到外部文件。
    2. 基于当前 inprogressPart 创建出PendingFileRecoverable对象,其中封装了 HadoopFsRecoverable 对包含了targetFile(part文件)、tempFile(inprogress文件)、offset(数据当前写入的偏移量)属性,是最重要的状态信息。在checkpoint完成后,会将tempFile命名为targetFile。
    3. 关闭inprogressPart,填充 pendingFileRecoverablesForCurrentCheckpoint信息,代表当前CK正在处理的 inprogressPart文件。
    private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
        InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
        if (inProgressPart != null) {
            //note: 和inProgressPart文件一一对应
            pendingFileRecoverable = inProgressPart.closeForCommit();
            //note: 存储到LIST
            pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
            inProgressPart = null;
        }
        return pendingFileRecoverable;
    }
    
    filesystem.BulkPartWriter#closeForCommit
    public PendingFileRecoverable closeForCommit() throws IOException {
          // note: BulkWriter   
          writer.flush();
          writer.finish();
          // note: OutputStreamBasedPartFileWriter#closeForCommit
          return super.closeForCommit();
      }
      
    OutputStreamBasedPartFileWriter#closeForCommit  
    public PendingFileRecoverable closeForCommit() throws IOException {
        // note: 创建OutputStreamBasedPendingFileRecoverable封装HadoopRecoverableFsDataOutputStream
        return new OutputStreamBasedPendingFileRecoverable(
                currentPartStream.closeForCommit().getRecoverable());
    }
    
    HadoopRecoverableFsDataOutputStream#closeForCommit  
    public Committer closeForCommit() throws IOException {
          final long pos = getPos();
          close();
          // note: 构建HadoopFsRecoverable,最终会调用commit方法完成文件rename
          return new HadoopFsCommitter(fs, new HadoopFsRecoverable(targetFile, tempFile, pos));
    }
    
    1. notifyCheckpointComplete checkpoint完成后回调该方法完成对inprogress文件的rename,如果该方法执行失败不会撤销已生成的checkpoint。

    注意:HadoopFsCommitter执行commit对文件进行重命名时,并不会覆盖已有的part文件,此时数据准确性没办法保障。

    void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
        Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
                pendingFileRecoverablesPerCheckpoint
                        .headMap(checkpointId, true)
                        .entrySet()
                        .iterator();
        while (it.hasNext()) {
            Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
            for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable :
                    entry.getValue()) {
                // note: 从pendingFileRecoverable 生成PendingFile 执行commit(), 对progress文件重命名。
                bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
            }
            it.remove();
        }
    }
    
    OutputStreamBasedBucketWriter#recoverPendingFile
    public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable)
            throws IOException {
        final RecoverableWriter.CommitRecoverable commitRecoverable;
    
        if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
            commitRecoverable =
                    ((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable)
                            .getCommitRecoverable();
        } else if (pendingFileRecoverable
                instanceof OutputStreamBasedInProgressFileRecoverable) {
            commitRecoverable =
                    ((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable)
                            .getResumeRecoverable();
        } else {
            throw new IllegalArgumentException(
                    "can not recover from the pendingFileRecoverable");
        }
        return new OutputStreamBasedPendingFile(
            // note: 最终调用HadoopFsCommitter的commit方法。
            recoverableWriter.recoverForCommit(commitRecoverable));
    }
    
    HadoopFsCommitter#commit
    public void commit() throws IOException {
        final Path src = recoverable.tempFile();
        final Path dest = recoverable.targetFile();
        final long expectedLength = recoverable.offset();
    
        final FileStatus srcStatus;
        try {
            srcStatus = fs.getFileStatus(src);
        } catch (IOException e) {
            throw new IOException("Cannot clean commit: Staging file does not exist.");
        }
    
        if (srcStatus.getLen() != expectedLength) {
            // something was done to this file since the committer was created.
            // this is not the "clean" case
            throw new IOException("Cannot clean commit: File has trailing junk data.");
        }
    
        try {
            // note: 文件被重命名为最终的part文件。
            fs.rename(src, dest);
        } catch (IOException e) {
            throw new IOException(
                    "Committing file by rename failed: " + src + " to " + dest, e);
        }
    }
    
    
    1. initializeState 时状态恢复主要包含以下几个主要流程:
      1. restoreInProgressFile 恢复正在处理的inprogress文件。从 inProgressFileRecoverable 获取inprogress文件名称及已写入数据的偏移量,重新构建BulkWriter。
      2. commitRecoveredPendingFiles 提交pending状态文件。pending状态文件数据已经写入文件系统,只是还未执行最终的commit操作对文件执行重命名,则继续执行后续重命名操作。
      3. updateActiveBucketId 如果activeBuckets 包含 restoredBucket 提交该restoredBucket,否则存储到activeBuckets。
    /** Constructor to restore a bucket from checkpointed state. */
    private Bucket(
            final int subtaskIndex,
            final long initialPartCounter,
            final BucketWriter<IN, BucketID> partFileFactory,
            final RollingPolicy<IN, BucketID> rollingPolicy,
            final BucketState<BucketID> bucketState,
            @Nullable final FileLifeCycleListener<BucketID> fileListener,
            final OutputFileConfig outputFileConfig)
            throws IOException {
    
        this(
                subtaskIndex,
                bucketState.getBucketId(),
                bucketState.getBucketPath(),
                initialPartCounter,
                partFileFactory,
                rollingPolicy,
                fileListener,
                outputFileConfig);
        restoreInProgressFile(bucketState);
        commitRecoveredPendingFiles(bucketState);
    }
    
    
    private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
        if (!state.hasInProgressFileRecoverable()) {
            return;
        }
        // we try to resume the previous in-progress file
        final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable =
                state.getInProgressFileRecoverable();
       
        //  note: bucketWriter 是否具有恢复能力
        if (bucketWriter.getProperties().supportsResume()) {
            // note: 恢复inProgressPart
            inProgressPart =
                    bucketWriter.resumeInProgressFileFrom(
                            bucketId,
                            inProgressFileRecoverable,
                            state.getInProgressFileCreationTime());
        } else {
            // if the writer does not support resume, then we close the
            // in-progress part and commit it, as done in the case of pending files.
            bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
        }
    }
    
    private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
    
        // we commit pending files for checkpoints that precess the last successful one, from which
        // we are recovering
        for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables :
                state.getPendingFileRecoverablesPerCheckpoint().values()) {
            for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable :
                    pendingFileRecoverables) {
                bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
            }
        }
    }
    
    Buckets#updateActiveBucketId
    private void updateActiveBucketId(
            final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
        // note: 当前流程没有 bucketLifeCycleListener,没有要处理的状态信息,直接返回。
        if (!restoredBucket.isActive()) {
            notifyBucketInactive(restoredBucket);
            return;
        }
        // note: 当前 activeBuckets 已经包含restoredBucket所属的Bucket,则将restoredBucket 进行提交。否则存储到activeBuckets 
        final Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
        if (bucket != null) {
            bucket.merge(restoredBucket);
        } else {
            activeBuckets.put(bucketId, restoredBucket);
        }
    }
    
    void merge(final Bucket<IN, BucketID> bucket) throws IOException {
        InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
        if (pendingFileRecoverable != null) {
            pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
        }
    }
    

    常见问题

    part文件格式

    part 文件名称生成规则: bucketPath_partPrefix_subtaskIndex_currentPartCounter_partSuffix。

    • bucketPath:bucket路径,例如:hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19
    • partPrefix:part文件前缀,通过OutputFileConfig设置,默认为part。
    • subtaskIndex: Sink Operator并行写入时,某一子任务当前索引,从0开始。
    • currentPartCounter:子任务生成的part文件数量,从0开始。

    例如:part-0-0 代表第一个task生成的第一个完整的part文件。

    inprogress 临时文件名称规则:.part 文件名称.inprogress.UUID

    • inprogress: 正在写入的临时文件的标识。
    • UUID: 随机生成的UUID。

    例如: hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/.part-0-0.inprogress.18296793-9fde-4376-b6fc-7c47512bd108 代表 形成part-0-0产生的临时文件。

    数据准确性保障

    1. 非checkpoint/savepoint启动,当文件系统已经存在部分part文件,从kafka起始位置重新消费数据可能会导致数据缺失或者增多,因为rename操作并不会覆盖已有的part文件(应该是个BUG,提了个jira还没回复HadoopFsCommitter, file rename failure)。

      假设存在part-0-0历史文件,从起始位置消费数据会生成新的part-0-0文件,新文件存储的数据条数无法保证和历史文件一致,可能多也可能少。

    2. notifyCheckpointComplete执行时,Flink程序被Kill,从最新状态快照启动,不会丢失数据。

      假设.part-0-0.inprogress.xxxx 对应的bucket已经执行完snapshotState方法,则数据已经被flush到文件系统,在notifyCheckpointComplete阶段将完成对该inprogress文件的重命名,如果此时程序突然被kill,该inprocess文件是有数据的,只是文件没有被最终重命名。此时,从CK启动,则会先完成会 inprocess 文件的重命名。

    相关文章

      网友评论

        本文标题:Flink Streaming File Sink

        本文链接:https://www.haomeiwen.com/subject/ilfjbltx.html