美文网首页Flink源码解析
一文搞懂 Flink checkpoint snapshot全过

一文搞懂 Flink checkpoint snapshot全过

作者: shengjk1 | 来源:发表于2020-07-06 14:01 被阅读0次
    前言

    上一篇,我们了解了 checkpoint 全流程,现在我们具体讲解一下 checkpoint 时 snapshot 的全过程。现在我们具体看一下 checkpoint 时是如何做 snapshot 的

    正文

    checkpoint 全流程 我们可以知道

    public void executeCheckpointing() throws Exception {
                startSyncPartNano = System.nanoTime();
    
                try {
                    // 调用 StreamOperator 进行 snapshotState 的入口方法
                    // 先 sourceOperator (flatMap -> source) 再 sinkOperator (sink -> filter)
                    for (StreamOperator<?> op : allOperators) {
                        //对每一个算子进行 snapshotInProgress 并存储至 operatorSnapshotsInProgress
                        // (存储 是异步checkpoint的一个引用) 然后分别进行本地 checkpoint store and jobManager ack
                        // 捕获 barrier 的过程其实就是处理 input 数据的过程,对应着 StreamInputProcessor.processInput() 方法
                        checkpointStreamOperator(op);
                    }
    ......
            }
    

    是做 snapshot 逻辑,具体如下( AbstractStreamOperator.snapshotState )

    @Override
        // 由此处统一持久化
        public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
                CheckpointStreamFactory factory) throws Exception {
    
            KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                    keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
    
            OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
    
            try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                    checkpointId,
                    timestamp,
                    factory,
                    keyGroupRange,
                    getContainingTask().getCancelables())) {
    
                snapshotState(snapshotContext);
    
                snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
                snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
    
                // state 持久化
                if (null != operatorStateBackend) {
                    snapshotInProgress.setOperatorStateManagedFuture(
                        // 触发一个异步的 snapshot 至 DefaultOperatorStateBackend(内部的)
                        operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
                // source -> flatMap --> rebance --> filter --> keyby --> sink
                // 只有当 sink 的时候,keyedStateBackend 才不为 null , 才会执行 backend 的 snapshot
                if (null != keyedStateBackend) {
                    snapshotInProgress.setKeyedStateManagedFuture(
                        // 触发一个异步的 snapshot 至 StateBacked
                        keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
            } catch (Exception snapshotException) {
                try {
                    snapshotInProgress.cancel();
                } catch (Exception e) {
                    snapshotException.addSuppressed(e);
                }
    
                String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                    getOperatorName() + ".";
    
                if (!getContainingTask().isCanceled()) {
                    LOG.info(snapshotFailMessage, snapshotException);
                }
                throw new Exception(snapshotFailMessage, snapshotException);
            }
    
            return snapshotInProgress;
        }
    

    由此可以知道,如果是 null != operatorStateBackend 则 operatorStateBackend.snapshot,如果 null != keyedStateBackend 则 keyedStateBackend.snapshot。
    此处,我们以 RocksDBIncrementalSnapshotOperation 为例 ( operatorStateBackend.snapshot 的代码注释已经很清楚了 )

    @Nonnull
        @Override
        protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(
            long checkpointId,
            long checkpointTimestamp,
            @Nonnull CheckpointStreamFactory checkpointStreamFactory,
            @Nonnull CheckpointOptions checkpointOptions) throws Exception {
    
            final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
            LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
    
            //  RocksDBIncrementalRestoreOperation 中 kvStateInformation 赋值
            //  kvStateInformation.put(columnFamilyName, registeredColumn(RocksDBKeyedStateBackend.RocksDbKvStateInfo));
            final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
            final Set<StateHandleID> baseSstFiles = snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
    
            // 对 rocksdb 做 checkpoint 为 RocksDBIncrementalSnapshotOperation.uploadSstFiles 做准备
            takeDBNativeCheckpoint(snapshotDirectory);
    
            // snapshot
            final RocksDBIncrementalSnapshotOperation snapshotOperation =
                new RocksDBIncrementalSnapshotOperation(
                    checkpointId,
                    checkpointStreamFactory,
                    snapshotDirectory,
                    baseSstFiles,
                    stateMetaInfoSnapshots);
    
            return snapshotOperation.toAsyncSnapshotFutureTask(cancelStreamRegistry);
        }
    

    进入 RocksDBIncrementalSnapshotOperation 内部

    @Override
            protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
    
                boolean completed = false;
    
                // Handle to the meta data file
                SnapshotResult<StreamStateHandle> metaStateHandle = null;
                // Handles to new sst files since the last completed checkpoint will go here
                final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
                // Handles to the misc files in the current snapshot will go here
                final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
    
                try {
    
                    // 写 meta (全量) 到 hdfs
                    metaStateHandle = materializeMetaData();
    
                    // Sanity checks - they should never fail
                    Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
                    Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
                        "Metadata for job manager was not properly created.");
    
                    //  将新产生的 sst file、misc file upload to checkpointFs
                    uploadSstFiles(sstFiles, miscFiles);
    
                    synchronized (materializedSstFiles) {
                        materializedSstFiles.put(checkpointId, sstFiles.keySet());
                    }
    
                    final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
                        new IncrementalRemoteKeyedStateHandle(
                            backendUID,
                            keyGroupRange,
                            checkpointId,
                            sstFiles,
                            miscFiles,
                            metaStateHandle.getJobManagerOwnedSnapshot());
    
                    //PermanentSnapshotDirectory
                    final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
                    final SnapshotResult<KeyedStateHandle> snapshotResult;
                    if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
    
                        // 增量的 localSnapshot
                        IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
                            new IncrementalLocalKeyedStateHandle(
                                backendUID,
                                checkpointId,
                                directoryStateHandle,
                                keyGroupRange,
                                metaStateHandle.getTaskLocalSnapshot(),
                                sstFiles.keySet());
    
                        //  localSnapshot report to local state manager,
                        //  jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
                        snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
                    } else {
                        //jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
                        snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
                    }
    
                    completed = true;
    
                    return snapshotResult;
                } finally {
                    if (!completed) {
                        final List<StateObject> statesToDiscard =
                            new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
                        statesToDiscard.add(metaStateHandle);
                        statesToDiscard.addAll(miscFiles.values());
                        statesToDiscard.addAll(sstFiles.values());
                        cleanupIncompleteSnapshot(statesToDiscard);
                    }
                }
            }
    

    元数据是全部持久化,而数据仅仅将新产生的 sst file、misc file upload to checkpointFs

    相关文章

      网友评论

        本文标题:一文搞懂 Flink checkpoint snapshot全过

        本文链接:https://www.haomeiwen.com/subject/qovwqktx.html