Flink rockdb 是怎么与 checkpoint 结合

作者: shengjk1 | 来源:发表于2019-12-24 19:46 被阅读0次

    最近在做实时数仓的过程中遇到了一些问题,于是有了这篇博客。

    我们知道当设置 backend 为 RocksDBBackend 时,mapState.put 操作最终转化为 rockdb.put 操作,如:

    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
    
            byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
            byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
    
            backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
    

    数据是存起来了,但是当进行 checkpoint 的时候, rocksdb 又做了什么呢?

    这就要查看 RocksDBKeyedStateBackend 类了,这个类很好的说明了 checkpoint 与 rocksdb 还有 hdfs 的关系

    //当进行 checkpoint 的时候并且要对 keyed state 做 snapshot 时,会触发此方法
        public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
            final long checkpointId,
            final long timestamp,
            @Nonnull final CheckpointStreamFactory streamFactory,
            @Nonnull CheckpointOptions checkpointOptions) throws Exception {
    
            long startTime = System.currentTimeMillis();
    
            // flush everything into db before taking a snapshot
            writeBatchWrapper.flush();
            
        //是进行 savepointSnapshotStrategy 还是 checkpointSnapshotStrategy
        //实际上 savepointSnapshotStrategy 和 checkpointSnapshotStrategy 均是 RocksDBKeyedStateBackend 的属性,均是 RocksDBSnapshotStrategyBase。
        //我们知道 RocksDBStateBackend 有两种 snapshot 策略,一种是 full snapshot ,一种是 incremental snapshot。这两种策略分别对应 RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy。
        //RocksDBSnapshotStrategyBase 为 RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy 的父类
            RocksDBSnapshotStrategyBase<K> chosenSnapshotStrategy =
                CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType() ?
                    savepointSnapshotStrategy : checkpointSnapshotStrategy;
    
        //开始进行 snapshot,此处为多态,我们以 RocksIncrementalSnapshotStrategy 为主,交叉查看
            RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner =
                chosenSnapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    
            chosenSnapshotStrategy.logSyncCompleted(streamFactory, startTime);
    
            return snapshotRunner;
        }
    

    进入 RocksIncrementalSnapshotStrategy

    @Nonnull
        @Override
        protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(
            long checkpointId,
            long checkpointTimestamp,
            @Nonnull CheckpointStreamFactory checkpointStreamFactory,
            @Nonnull CheckpointOptions checkpointOptions) throws Exception {
    
            final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
            LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
    
            // RocksDBIncrementalRestoreOperation 中 kvStateInformation 赋值(RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy 对应的 kvStateInformation 是一样的 )
            //      kvStateInformation.put(columnFamilyName, registeredColumn(RocksDBKeyedStateBackend.RocksDbKvStateInfo));
            final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
        //对 meta data 做全量 snapshot ,并将结果赋值给 stateMetaInfoSnapshots
        final Set<StateHandleID> baseSstFiles = snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
    
        //对 rocksdb 做 checkpoint 为 RocksDBIncrementalSnapshotOperation.uploadSstFiles 做准备
            takeDBNativeCheckpoint(snapshotDirectory);
    
            // snapshot
            final RocksDBIncrementalSnapshotOperation snapshotOperation =
                new RocksDBIncrementalSnapshotOperation(
                    checkpointId,
                    checkpointStreamFactory,
                    snapshotDirectory,
                    baseSstFiles,
                    stateMetaInfoSnapshots);
            
        // 执行增量快照
            return snapshotOperation.toAsyncSnapshotFutureTask(cancelStreamRegistry);
        }
    

    我们来具体看一下 RocksDBIncrementalSnapshotOperation 是怎么执行的

    @Override
            protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
    
                boolean completed = false;
    
                // Handle to the meta data file
                SnapshotResult<StreamStateHandle> metaStateHandle = null;
                // Handles to new sst files since the last completed checkpoint will go here
                final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
                // Handles to the misc files in the current snapshot will go here
                final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
    
                try {
    
                    // 写 meta 到 hdfs
                    metaStateHandle = materializeMetaData();
    
                    // Sanity checks - they should never fail
                    Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
                    Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
                        "Metadata for job manager was not properly created.");
    
                    // 将新产生的 sst file、misc file upload to checkpointFs
                    uploadSstFiles(sstFiles, miscFiles);
    
                    synchronized (materializedSstFiles) {
                        materializedSstFiles.put(checkpointId, sstFiles.keySet());
                    }
    
                    final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
                        new IncrementalRemoteKeyedStateHandle(
                            backendUID,
                            keyGroupRange,
                            checkpointId,
                            sstFiles,
                            miscFiles,
                            metaStateHandle.getJobManagerOwnedSnapshot());
    
                    //PermanentSnapshotDirectory
                    final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
                    final SnapshotResult<KeyedStateHandle> snapshotResult;
                    if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
    
                        //增量的 localSnapshot
                        IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
                            new IncrementalLocalKeyedStateHandle(
                                backendUID,
                                checkpointId,
                                directoryStateHandle,
                                keyGroupRange,
                                metaStateHandle.getTaskLocalSnapshot(),
                                sstFiles.keySet());
    
                        // localSnapshot report to local state manager, jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
                        snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
                    } else {
                        //jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
                        snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
                    }
    
                    completed = true;
    
                    return snapshotResult;
                } finally {
                    if (!completed) {
                        final List<StateObject> statesToDiscard =
                            new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
                        statesToDiscard.add(metaStateHandle);
                        statesToDiscard.addAll(miscFiles.values());
                        statesToDiscard.addAll(sstFiles.values());
                        cleanupIncompleteSnapshot(statesToDiscard);
                    }
                }
            }
    

    上述就是 rocksdb 做 checkpoint 的全过程了,从中可以看出元数据是全量更新的,具体的 state data 是增量更新的。

    那么元数据和 state data 具体是什么呢?让我们来感性的认识一下
    先看 meta data:


    image

    meta data 主要就是一些元数据,像:rocksdb.column.family.name以及 state data的全目录。

    下面来看一下 state data:


    image

    主要是 rocksdb 的一些配置信息以及当前的 db 目录,


    image 主要就是状态中(比如 mapState )存储的一些东西的具体。

    我们都知道 checkpoint 是异步的,那么拥有 key state 的 operator 进行 notifyCheckpointComplete 的呢?

    首先是 AbstractUdfStreamOperator.notifyCheckpointComplete ------> 
    AbstractStreamOperator.notifyCheckpointComplete --------> 
    RocksDBKeyedStateBackend.notifyCheckpointComplete --------->
    RocksIncrementalSnapshotStrategy.notifyCheckpointComplete
    

    到该处为止,对应的 key state 状态的算子对应的增量 checkpoint 就做完了。

    相关文章

      网友评论

        本文标题:Flink rockdb 是怎么与 checkpoint 结合

        本文链接:https://www.haomeiwen.com/subject/unufoctx.html