美文网首页泛大数据BIG DATA-实时计算
Flink状态管理源码分析(三)-快照策略

Flink状态管理源码分析(三)-快照策略

作者: 零度沸腾_yjz | 来源:发表于2021-08-24 19:28 被阅读0次

    快照策略(SnapshotStrategy)

    Flink的检查点机制是建立在分布式一致快照之上的,从而实现数据处理的exactly-once处理语义。无论是Keyed state(HeapKeyStateBackend、RocksDBKeyedStateBackend)还是Operator state(DefaultOperatorStateBackend)都会接收快照执行请求(snapshot方法),而具体的快照操作都交由具体的snapshot策略完成。

    下面是Flink快照策略UML,可以看到Keyed state中的HeapSnapshotStrategyRocksDBSnapshotStrategyBase分别对应堆内存和RocksDB(RocksDB又细分为全量快照和增量快照)存储后端的快照执行策略,而DefaultOperatorStateBackendSnapshotStrategy对应着Operator state存储后端快照执行策略。
    除了Keyed state和Operator state之外,因为savepoint本质也是snapshot的特殊实现,所以对应的savepoint执行策略SavepointSnapshotStrategy也实现了SnapshotStrategy接口。

    SnapshotStrategy

    下面是SnapshotStrategy接口定义,其中定义了执行快照的所需步骤:

    1. 同步执行部分,用于生成执行快照所需的资源,为下一步写入快照数据做好资源准备。
    2. 异步执行部分,将快照数据写入到提供的CheckpointStreamFactory中。
    public interface SnapshotStrategy<S extends StateObject, SR extends SnapshotResources> {
        //同步执行生成快照的部分,可以理解为为执行快照准备必要的资源。
        SR syncPrepareResources(long checkpointId) throws Exception;
        //异步执行快照写入部分,快照数据写入到CheckpointFactory
        SnapshotResultSupplier<S> asyncSnapshot(
                SR syncPartResource,
                long checkpointId,
                long timestamp,
                @Nonnull CheckpointStreamFactory streamFactory,
                @Nonnull CheckpointOptions checkpointOptions);
    
        //用于执行异步快照部分的Supplier
        @FunctionalInterface
        interface SnapshotResultSupplier<S extends StateObject> {
            //Performs the asynchronous part of a checkpoint and returns the snapshot result.
            SnapshotResult<S> get(CloseableRegistry snapshotCloseableRegistry) throws Exception;
        }
    }
    

    下面是SnapshotResources所对应的UML图:

    • 全量快照FullSnapshotResources下分别对应着堆内存快照资源HeapSnapshotResources以及RocksDB全量快照资源实现类RocksDBFullSnapshotResources
    • RocksDB增量快照资源实现类IncrementalRocksDBSnapshotResoruces
    • Operator state快照资源实现类DefaultOperatorStateBackendSnapshotResources
    SnapshotResources

    SnapshotResources接口定义如下,只有一个release方法定义,用于在异步Snapshot执行完成后清空资源。

    @Internal
    public interface SnapshotResources {
        /** Cleans up the resources after the asynchronous part is done. */
        void release();
    }
    

    关于具体资源实现类我们在对应的快照策略中来查看。

    堆内存快照策略(HeapSnasphotStrategy)

    在看堆内存快照策略之前,我们先看下堆内存执行快照所对应的资源类HeapSnapshotResources。通过上面的UML我们可以看到堆内存快照和RocksDB全量快照都实现了FullSnapshotResources,这也说明了堆内存存储后端不存在增量快照的实现。

    FullSnapshotResources定义了与具体存储后端无关的全量执行全量快照资源,它们都是通过FullSnapshotAsyncWriter来写快照数据。

    FullSnapshotResources接口定义如下,其中泛型K代表了具体存储key的数据类型。

    public interface FullSnapshotResources<K> extends SnapshotResources {
    
        //返回此状态快照的元数据列表,StateMetaInfoSnapshot记录每个状态对应快照元数据信息,比如state name、  backend 类型、序列化器等。
        List<StateMetaInfoSnapshot> getMetaInfoSnapshots();
        
        //创建用于遍历当前快照的迭代器
        KeyValueStateIterator createKVStateIterator() throws IOException;
        
        //当前快照对应的KeyGroupRange
        KeyGroupRange getKeyGroupRange();
    
        /** Returns key {@link TypeSerializer}. */
        TypeSerializer<K> getKeySerializer();
    
        /** Returns the {@link StreamCompressionDecorator} that should be used for writing. */
        StreamCompressionDecorator getStreamCompressionDecorator();
    }
    
    

    下面我们看下HeapSnapshotStrategy中的两个核心方法syncPrepareResourcesasyncSnapshot

    class HeapSnapshotStrategy<K>
            implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
        ...
        //准备snapshot资源HeapSnapshotResources
        @Override
        public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
            return HeapSnapshotResources.create(
                    registeredKVStates,
                    registeredPQStates,
                    keyGroupCompressionDecorator,
                    keyGroupRange,
                    getKeySerializer(),
                    totalKeyGroups);
        }
    
        @Override
        public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
                HeapSnapshotResources<K> syncPartResource,
                long checkpointId,
                long timestamp,
                @Nonnull CheckpointStreamFactory streamFactory,
                @Nonnull CheckpointOptions checkpointOptions) {
                ......
            //SupplierWithException是Java Supplier可能抛出异常的函数接口,第一个泛型参数是supplier执行返回类型,第二个参数为Supplier中函数抛出的异常
            final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
                    checkpointStreamSupplier =
                            localRecoveryConfig.isLocalRecoveryEnabled() //是否使用本地恢复
                                            && !checkpointOptions.getCheckpointType().isSavepoint()
                                    ? () ->
                                            createDuplicatingStream( //本地恢复并且当前不是savepoint,创建复制流
                                                    checkpointId,
                                                    CheckpointedStateScope.EXCLUSIVE,
                                                    streamFactory,
                                                    localRecoveryConfig
                                                            .getLocalStateDirectoryProvider())
                                    : () ->
                                            createSimpleStream(//非本地恢复,或者是savepoint,创建简单流
                                                    CheckpointedStateScope.EXCLUSIVE, streamFactory);
    
            return (snapshotCloseableRegistry) -> {
                ......
                //输出数据流
                final CheckpointStreamFactory.CheckpointStateOutputStream localStream =
                        streamWithResultProvider.getCheckpointOutputStream();
                ////使用KeyedBackendSerializationProxy写cp数据
                final DataOutputViewStreamWrapper outView =
                        new DataOutputViewStreamWrapper(localStream);
                serializationProxy.write(outView);
               ......
            };
        }
    }
    

    上面asyncSnapshot方法通过CheckpointStreamWithResultProvider来创建快照输出流。该类核心就是封装了获取输出流,如果没有配置本地状态恢复,只会创建一个输出流来讲snapshot数据写入到job所配置的Checkpoint存储。如果配置了本地恢复,就需要将状态数据写本地了(本地数据恢复),所以对于这种情况会获取两个输出流,一个用于写配置的Checkpoint存储,一个用于写本地。

    public interface CheckpointStreamWithResultProvider extends Closeable {
        //关闭输出流,并返回带有流句柄的快照结果
        @Nonnull
        SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException;
    
        //返回snapshot输出流
        @Nonnull
        CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();
    
        @Override
        default void close() throws IOException {
            getCheckpointOutputStream().close();
        }
        ...
    }
    

    CheckpointStreamWithResultProvider的两个内部实现类也就分别对应了创建simple流(PrimaryStreamOnly,只会创建一个输出流, 这个流是我们配置checkpoint存储的写入地方,可能是远端HDFS、JobManager等),和创建duplicating流(PrimaryAndSecondaryStream,两个输出流,第一个流和PrimaryStreamOnly一样;第二个输出流用于写入到本地、TaskManager等,用于本地恢复)。

    CheckpointStreamWithResultProvider

    创建simple stream,下面可以看到只会创建一个primary stream。

    static CheckpointStreamWithResultProvider createSimpleStream(
                @Nonnull CheckpointedStateScope checkpointedStateScope,
                @Nonnull CheckpointStreamFactory primaryStreamFactory)
                throws IOException {
            //创建主输出流
            CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
                    primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
            return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
        }
    

    创建duplicating stream,可以看到除了一个primary stream外,还会创建写文件的second stream。

    @Nonnull
        static CheckpointStreamWithResultProvider createDuplicatingStream(
                @Nonnegative long checkpointId,
                @Nonnull CheckpointedStateScope checkpointedStateScope,
                @Nonnull CheckpointStreamFactory primaryStreamFactory,
                @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
                throws IOException {
    
            CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
                    primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
    
            try {
                //cp数据写出路径
                File outFile =
                        new File(
                                secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(
                                        checkpointId),
                                String.valueOf(UUID.randomUUID()));
                Path outPath = new Path(outFile.toURI());
    
                //构建写入文件的输出流
                CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
                        new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
    
                return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(
                        primaryOut, secondaryOut);
            } catch (IOException secondaryEx) {
                LOG.warn(
                        "Exception when opening secondary/local checkpoint output stream. "
                                + "Continue only with the primary stream.",
                        secondaryEx);
            }
    
            return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
        }
    

    上面CheckpointStreamFactory创建输出流,该输出流用于将Checkpoint数据写入到外部,比如通过FsCheckpoihntStreamFactory将检查点数据写到外部文件系统。

    CheckpointStreamFactory
    public interface CheckpointStreamFactory {
    
        //创建一个新的状态输出流,CheckpointStateOutputStream为当前CheckpointStreamFactory内部静态抽象类
        CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
                throws IOException;
    
        //CheckpointStateOutputStream基类,相关实现都在CheckpointStreamFactory的子类
        abstract class CheckpointStateOutputStream extends FSDataOutputStream {
    
            //关闭数据流并获取句柄
            @Nullable
            public abstract StreamStateHandle closeAndGetHandle() throws IOException;
    
            //关闭数据流
            @Override
            public abstract void close() throws IOException;
        }
    }
    

    RocksDB快照存储策略

    上面的UML我们可以知道RocksDB快照存储策略主要对应三个核心类,抽象类RocksDBSnapshotStrategyBase、全量快照策略RocksDBFullSnapshotStrategy和增量快照策略RocksDBIncrementalSnapshotStrategy
    RocksDBSnapshotStrategyBase定义了一些RocksDB、state相关的成员变量,具体实现都在相关子类中。

    全量快照

    全量快照RocksDBFullSnapshotStrategy用于创建RocksDBKeyedStateBackend的全量快照,每次Checkpoint会将全量状态数据同步到远端(JobManager或HDFS)。

    下面我们同样看下核心方法:asyncPrepareResources和asyncSnapshot。

    public class RocksFullSnapshotStrategy<K>
            extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
        ......
        
        @Override
        public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
            //构建RocksDB全量快照资源类,RocksDBFullSnapshotResources和HeapFullSnapshotResources相比,包含了
            //RocksDB 实例和快照Snapshot
            return RocksDBFullSnapshotResources.create(
                    kvStateInformation,
                    registeredPQStates,
                    db,
                    rocksDBResourceGuard,
                    keyGroupRange,
                    keySerializer,
                    keyGroupPrefixBytes,
                    keyGroupCompressionDecorator);
        }
    
        @Override
        public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
                FullSnapshotResources<K> fullRocksDBSnapshotResources,
                long checkpointId,
                long timestamp,
                @Nonnull CheckpointStreamFactory checkpointStreamFactory,
                @Nonnull CheckpointOptions checkpointOptions) {
    
            if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
                            timestamp);
                }
                return registry -> SnapshotResult.empty();
            }
    
            //createCheckpointStreamSupplier和Heap中一样,根据是否启动本地恢复,创建Duplicating和simple stream
            final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
                    checkpointStreamSupplier =
                            createCheckpointStreamSupplier(
                                    checkpointId, checkpointStreamFactory, checkpointOptions);
    
            //创建全量异步Writer
            return new FullSnapshotAsyncWriter<>(
                    checkpointOptions.getCheckpointType(),
                    checkpointStreamSupplier,
                    fullRocksDBSnapshotResources);
        }
    ......
    }
    

    FullSnapshotAsyncWriter也是一个Supplier,用于异步写全量快照数据到给定的输出流中。

    public class FullSnapshotAsyncWriter<K>
            implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {
            @Override
        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
                throws Exception {
            ......
            //获取输出流
            final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider =
                    checkpointStreamSupplier.get();
    
            snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
            //写快照数据到输出流中
            writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
            ......
        }
        
        private void writeSnapshotToOutputStream(
                @Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
                @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets)
                throws IOException, InterruptedException {
            //通过输出视图将快照数据写入到指定输出流中,注意 checkpointStreamWithResultProvider可能写两份数据
            final DataOutputView outputView =
                    new DataOutputViewStreamWrapper(
                            checkpointStreamWithResultProvider.getCheckpointOutputStream());
            //写元数据
            writeKVStateMetaData(outputView);
            //为每个state实例写状态数据
            try (KeyValueStateIterator kvStateIterator = snapshotResources.createKVStateIterator()) {
                writeKVStateData(
                        kvStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
            }
        }
    }
    

    下面我们看下最关键的writeKVStateData,到底是怎么将全量数据写到外部的。我们抛开繁杂的细节,就看这里怎么写的。可以看到实际就是迭代KeyValueStateIterator

    private void writeKVStateData(
                final KeyValueStateIterator mergeIterator,
                final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
                final KeyGroupRangeOffsets keyGroupRangeOffsets)
                throws IOException, InterruptedException {
            ......
            try {
               ......
                //就是遍历KeyValueStateIterator迭代器
                // main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking
                // key-group offsets.
                while (mergeIterator.isValid()) {
                    ......
                    writeKeyValuePair(previousKey, previousValue, kgOutView);
                    ......
                    // request next k/v pair
                    previousKey = mergeIterator.key();
                    previousValue = mergeIterator.value();
                    mergeIterator.next();
                }
                ......
            } finally {
                // this will just close the outer stream
                IOUtils.closeQuietly(kgOutStream);
            }
        }
    

    KeyValueStateIterator就是记录了当前快照的所有key-value实体,RocksDB和Heap分别有各自的迭代器实现。

    KeyValueStateIterator

    我们看下RocksStatesPerKeyGroupMergeIterator是如何创建的。我们在上面看FullSnapshotResources接口时看到了抽象方法createKVStateIterator定义,该方法就是专门用于创建迭代器的。HeapSnapshotResourcesRocksDBFullSnapshotResources分别实现了该方法来创建Heap和RocksDB迭代器。下面是RocksDBFullSnapshotResources.createKVStateIterator实现。

    @Override
        public KeyValueStateIterator createKVStateIterator() throws IOException {
            ......
            try {
                //创建RocksDB ReadOptions,设置读取上面的RocksDB snapshot,该snapshot是在Checkpoint同步阶段生成的
                ReadOptions readOptions = new ReadOptions();
                closeableRegistry.registerCloseable(readOptions::close);
                readOptions.setSnapshot(snapshot);
    
                //RocksDBIteratorWrapper是对RocksDBIterator的一层包装
                List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
                        createKVStateIterators(closeableRegistry, readOptions);
               .......
             //RocksStatesPerKeyGroupMergeIterator实际是将多个state实例(ColumnFamily)的迭代器包成一个迭代器
                return new RocksStatesPerKeyGroupMergeIterator(
                        closeableRegistry,
                        kvStateIterators,
                        heapPriorityQueueIterators,
                        keyGroupPrefixBytes);
            } catch (Throwable t) {
                IOUtils.closeQuietly(closeableRegistry);
                throw new IOException("Error creating merge iterator", t);
            }
        }
    private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(
                CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException {
            final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
                    new ArrayList<>(metaData.size());
            int kvStateId = 0;
            //每个state,也就是每个RocksDB的ColumnFamily都会创建一个迭代器
            for (MetaData metaDataEntry : metaData) {
                RocksIteratorWrapper rocksIteratorWrapper =
                        createRocksIteratorWrapper(
                                db,
                                metaDataEntry.rocksDbKvStateInfo.columnFamilyHandle,
                                metaDataEntry.stateSnapshotTransformer,
                                readOptions);
                kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));
                closeableRegistry.registerCloseable(rocksIteratorWrapper);
                ++kvStateId;
            }
            return kvStateIterators;
        }
    
        private static RocksIteratorWrapper createRocksIteratorWrapper(
                RocksDB db,
                ColumnFamilyHandle columnFamilyHandle,
                StateSnapshotTransformer<byte[]> stateSnapshotTransformer,
                ReadOptions readOptions) {
            //创建RocksDB Iterator,被包在了Flink定义的RocksDBIteratorWrapper中
            RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
            return stateSnapshotTransformer == null
                    ? new RocksIteratorWrapper(rocksIterator)
                    : new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
        }
    

    上面代码可以看到这里的迭代器其实本质还是RocksDB自己的迭代器(指定了读取的snapshot),Flink将其包在了RocksDBIteratorWrapper中(为什么需要包一层可以查看RocksDB自身官网Iterator异常处理)。因为可能有多个state实例,每个实例都有自己的一个迭代器,最后Flink将这些迭代器封装到一个迭代器中,即RocksStatetsPerKeyGroupMergeIterator

    增量快照

    RocksIncrementalSnapshotStrategyRocksDBKeyedStateBackend增量快照策略,它是基于RocksDB的native Checkpoint来实现增量快照的。

    我们在看RocksIncrementalSnapshotStrategy的syncPrepareResources和asyncSnapshot前,先看下RocksDB增量快照会用到的一些关键成员变量。

    //RocksDB增量快照资源信息为内部类IncrementalRocksDBSnapshotResources
    public class RocksIncrementalSnapshotStrategy<K>
            extends RocksDBSnapshotStrategyBase<
                    K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> {
    
        //RocksDB实例目录
        @Nonnull private final File instanceBasePath;
    
        /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
        @Nonnull private final UUID backendUID;
        
         //记录了checkpoint id和当前checkpoint sst文件映射关系
        @Nonnull private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
    
        //最后一次完成的Checkpoint ID
        private long lastCompletedCheckpointId;
    
        //用于上传快照文件(RocksDB checkpoint生成的sst文件等)
        private final RocksDBStateUploader stateUploader;
        ...
    }
    

    下面我们再看下同步资源准备阶段,主要做了两件事:

    1. 获取最近一次Checkpoint生成的sst文件,也就是通过materializedSstFiles获取。用于增量文件对比。
    2. 创建RocksDB Checkpoint。
    @Override
        public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId)
                throws Exception {
    
            //目录准备,如果开启本地恢复,则创建永久目录,否则创建临时目录
            final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
            LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
            
            final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                    new ArrayList<>(kvStateInformation.size());
            //最近一次完成的Checkpoint 所生成的sst文件,用于增量对比
            final Set<StateHandleID> baseSstFiles =
                    snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
            //创建RocksDB 检查点
            takeDBNativeCheckpoint(snapshotDirectory);
    
            return new IncrementalRocksDBSnapshotResources(
                    snapshotDirectory, baseSstFiles, stateMetaInfoSnapshots);
        }
    

    takeDBNativeCheckpoint就是同步创建RocksDB的Checkpoint,Checkpoint数据会在指定目录生成(sst文件、misc文件)。

    private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
                throws Exception {
            try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
                    Checkpoint checkpoint = Checkpoint.create(db)) {
                checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
            } catch (Exception ex) {
                ......
            }
        }
    
    

    asyncSnapshot内部很简单,主要创建RocksDBIncrementalSnapshotOperation Supplier来创建增量快照。

    @Override
        public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
                IncrementalRocksDBSnapshotResources snapshotResources,
                long checkpointId,
                long timestamp,
                @Nonnull CheckpointStreamFactory checkpointStreamFactory,
                @Nonnull CheckpointOptions checkpointOptions) {
            ...
            return new RocksDBIncrementalSnapshotOperation(
                    checkpointId,
                    checkpointStreamFactory,
                    snapshotResources.snapshotDirectory, //RocksDB Checkpoint生成目录
                    snapshotResources.baseSstFiles, //上次Cp完成的sst文件
                    snapshotResources.stateMetaInfoSnapshots);
        }
    
    

    下面我们看下增量快照实现的核心RocksDBIncrementalSnapshotOperation

    private final class RocksDBIncrementalSnapshotOperation
                implements SnapshotResultSupplier<KeyedStateHandle> {
        ...
        
        @Override
         public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
                 throws Exception {
                ...
                // 当前RocksDB checkpoint生成的sst文件
                final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
                // 当前RocksDB Checkpoint的misc files(元数据文件)
                final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
                ......
                //上传增量sst文件和misc 文件,uploadSstFiles方法内部获取遍历RocksDB Checkpoint目录比较新增sst文件
                uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
                //塞入当前Checkpoint对应sst文件
                synchronized (materializedSstFiles) {
                        materializedSstFiles.put(checkpointId, sstFiles.keySet());
                    }
                ......
        }   
    }
    

    我们再看下上面的uploadSstFiles方法实现:

     private void uploadSstFiles(
                    @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,
                    @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,
                    @Nonnull CloseableRegistry snapshotCloseableRegistry)
                    throws Exception {
                //增量sst本地文件路径
                Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
                //misc文件路径
                Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
                //当前RocksDB Checkpoint目录
                Path[] files = localBackupDirectory.listDirectory();
                if (files != null) {
                    //查找增量文件
                    createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
                    //使用stateUploader上传增量sst文件
                    sstFiles.putAll(
                            stateUploader.uploadFilesToCheckpointFs(
                                    sstFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
                    //上传misc文件
                    miscFiles.putAll(
                            stateUploader.uploadFilesToCheckpointFs(
                                    miscFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
                }
            }
    

    上面createUploadFilesPaths方法用于对比查找增量sst文件,并生成要被上传的sst文件和misc文件。

    private void createUploadFilePaths(
                    Path[] files,
                    Map<StateHandleID, StreamStateHandle> sstFiles,
                    Map<StateHandleID, Path> sstFilePaths,
                    Map<StateHandleID, Path> miscFilePaths) {
                for (Path filePath : files) {
                    final String fileName = filePath.getFileName().toString();
                    //文件句柄
                    final StateHandleID stateHandleID = new StateHandleID(fileName);
                    //sst文件和最后一次Cp sst文件对比,查找增量
                    if (fileName.endsWith(SST_FILE_SUFFIX)) {
                        final boolean existsAlready =
                                baseSstFiles != null && baseSstFiles.contains(stateHandleID);
    
                        if (existsAlready) {
                            //对于之前已经存在的sst文件,只使用一个占位符说明之前上传过的,文件在共享目录
                            sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
                        } else {
                            //新增文件,将要被上传的
                            sstFilePaths.put(stateHandleID, filePath);
                        }
                    } else {
                        //misc文件全部上传
                        miscFilePaths.put(stateHandleID, filePath);
                    }
                }
            }
    

    可以看到增量快照的实现逻辑就是:

    1. 通过RocksDB的Checkpoint生成当前快照的sst文件(由于LSM特性,sst文件是不可变的).
    2. Flink每次记录当前Checkpoint id和其快照sst文件的映射关系。
    3. 上传当前Checkpoint对应的sst文件和misc文件。
    4. 之后的Checkpoint中如果还有之前的sst文件,那这些文件就不需要在上传到HDFS了。

    可以看到Flink的增量Checkpoint就是巧妙利用了LSM 中sst文件是递增不变的特性。

    Operator state快照策略

    Operator state的快照策略只有一个,即DefaultOperatorStateBackendSnapshotStrategy,它将Operator state中的ListState和BroadcastState的快照数据写出到快照存储端。

    class DefaultOperatorStateBackendSnapshotStrategy
            implements SnapshotStrategy<
                    OperatorStateHandle,
                    DefaultOperatorStateBackendSnapshotStrategy
                            .DefaultOperatorStateBackendSnapshotResources> {
        private final ClassLoader userClassLoader;
        //Operator state中只有两类state:ListState和BroadcastState
        private final Map<String, PartitionableListState<?>> registeredOperatorStates;
        private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    
        protected DefaultOperatorStateBackendSnapshotStrategy(
                ClassLoader userClassLoader,
                Map<String, PartitionableListState<?>> registeredOperatorStates,
                Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates) {
            this.userClassLoader = userClassLoader;
            this.registeredOperatorStates = registeredOperatorStates;
            this.registeredBroadcastStates = registeredBroadcastStates;
        }
        ......
    }
    

    在同步准备资源阶段,DefaultOperatorStateBackendSnapshotStrategy只做了一件事:深拷贝ListState和BroadcastState。深拷贝的目的就是同步创建这个时刻的快照,以保证exactly-once。

    @Override
        public DefaultOperatorStateBackendSnapshotResources syncPrepareResources(long checkpointId) {
            
            //存放拷贝后的Operator state
            final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
                    new HashMap<>(registeredOperatorStates.size());
            final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
                    new HashMap<>(registeredBroadcastStates.size());
    
            ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(userClassLoader);
            try {
                //将传递ListState和BroadcastState进行深拷贝,便于后续使用
                if (!registeredOperatorStates.isEmpty()) {
                    for (Map.Entry<String, PartitionableListState<?>> entry :
                            registeredOperatorStates.entrySet()) {
                        PartitionableListState<?> listState = entry.getValue();
                        if (null != listState) {
                            listState = listState.deepCopy();
                        }
                        registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                    }
                }
                //拷贝broad cast state
                if (!registeredBroadcastStates.isEmpty()) {
                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                            registeredBroadcastStates.entrySet()) {
                        BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
                        if (null != broadcastState) {
                            broadcastState = broadcastState.deepCopy();
                        }
                        registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                    }
                }
            } finally {
                Thread.currentThread().setContextClassLoader(snapshotClassLoader);
            }
    
            return new DefaultOperatorStateBackendSnapshotResources(
                    registeredOperatorStatesDeepCopies, registeredBroadcastStatesDeepCopies);
        }
    

    深拷贝完Operator state后,asyncSnapshot方法就开始异步写快照数据到CheckpointStreamFactory了。

    @Override
        public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(
                DefaultOperatorStateBackendSnapshotResources syncPartResource,
                long checkpointId,
                long timestamp,
                @Nonnull CheckpointStreamFactory streamFactory,
                @Nonnull CheckpointOptions checkpointOptions) {
            ......
            return (snapshotCloseableRegistry) -> {
                //创建输出流
                CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                        streamFactory.createCheckpointStateOutputStream(
                                CheckpointedStateScope.EXCLUSIVE);
                snapshotCloseableRegistry.registerCloseable(localOut);
                ......
    
                //通过OperatorBackendSerializationProxy写快照数据到输出流
                DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
                OperatorBackendSerializationProxy backendSerializationProxy =
                        new OperatorBackendSerializationProxy(
                                operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
                backendSerializationProxy.write(dov);
    
                ......
                    return SnapshotResult.of(retValue);
                } else {
                    throw new IOException("Stream was already unregistered.");
                }
            };
        }
    

    相关文章

      网友评论

        本文标题:Flink状态管理源码分析(三)-快照策略

        本文链接:https://www.haomeiwen.com/subject/mifjiltx.html