美文网首页Flink源码解析
一文搞懂Flink rocksdb中的数据恢复

一文搞懂Flink rocksdb中的数据恢复

作者: shengjk1 | 来源:发表于2020-08-12 20:51 被阅读0次

当我们设置 rocksdb state backend 时,并且从 checkpoint 重启时,首先进入 RocksDBKeyedStateBackendBuilder 的 getRocksDBRestoreOperation 方法

// rockdb restore 入口方法
    private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
        int keyGroupPrefixBytes,
        CloseableRegistry cancelStreamRegistry,
        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
        RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
        if (restoreStateHandles.isEmpty()) {
            return new RocksDBNoneRestoreOperation<>(
                keyGroupRange,
                keyGroupPrefixBytes,
                numberOfTransferingThreads,
                cancelStreamRegistry,
                userCodeClassLoader,
                kvStateInformation,
                keySerializerProvider,
                instanceBasePath,
                instanceRocksDBPath,
                dbOptions,
                columnFamilyOptionsFactory,
                nativeMetricOptions,
                metricGroup,
                restoreStateHandles,
                ttlCompactFiltersManager);
        }
        KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
        if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
            return new RocksDBIncrementalRestoreOperation<>(
                operatorIdentifier,
                keyGroupRange,
                keyGroupPrefixBytes,
                numberOfTransferingThreads,
                cancelStreamRegistry,
                userCodeClassLoader,
                kvStateInformation,
                keySerializerProvider,
                instanceBasePath,
                instanceRocksDBPath,
                dbOptions,
                columnFamilyOptionsFactory,
                nativeMetricOptions,
                metricGroup,
                restoreStateHandles,
                ttlCompactFiltersManager);
        } else {
            return new RocksDBFullRestoreOperation<>(
                keyGroupRange,
                keyGroupPrefixBytes,
                numberOfTransferingThreads,
                cancelStreamRegistry,
                userCodeClassLoader,
                kvStateInformation,
                keySerializerProvider,
                instanceBasePath,
                instanceRocksDBPath,
                dbOptions,
                columnFamilyOptionsFactory,
                nativeMetricOptions,
                metricGroup,
                restoreStateHandles,
                ttlCompactFiltersManager);
        }
    }

当没有什么 state 需要恢复时,会 new RocksDBNoneRestoreOperation ,当增量做 checkpoint ,恢复的时候 new RocksDBIncrementalRestoreOperation,全量的话 RocksDBFullRestoreOperation。
这里我们以 RocksDBIncrementalRestoreOperation 为例进行分析

@Override
    public RocksDBRestoreResult restore() throws Exception {

        if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {
            return null;
        }

        final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();

        boolean isRescaling = (restoreStateHandles.size() > 1 ||
            !Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));

        if (isRescaling) {
            restoreWithRescaling(restoreStateHandles);
        } else {
            restoreWithoutRescaling(theFirstStateHandle);
        }
        return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
            nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);
    }

首先呢,最关键性的方法也就是 restore 方法,当进行 rescale 的时候会执行 restoreWithRescaling 方法,其中 restoreStateHandles 可以简单的理解为 需要 restore state 的引用

private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {

        // Prepare for restore with rescaling
        KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
            restoreStateHandles, keyGroupRange);

        // Init base DB instance
        if (initialHandle != null) {
            restoreStateHandles.remove(initialHandle);
            initDBWithRescaling(initialHandle);
        } else {
            openDB();
        }

        // Transfer remaining key-groups from temporary instance into base DB
        byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
        RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);

        byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
        RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

        for (KeyedStateHandle rawStateHandle : restoreStateHandles) {

            if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
                throw new IllegalStateException("Unexpected state handle type, " +
                    "expected " + IncrementalRemoteKeyedStateHandle.class +
                    ", but found " + rawStateHandle.getClass());
            }

            //本地的
            Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
            //首先呢会把 rawStateHandle 对应的 state 数据下载到 temporaryRestoreInstancePath 并且作为一个临时的 RocksDB 实例的数据目录
            try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
                (IncrementalRemoteKeyedStateHandle) rawStateHandle,
                temporaryRestoreInstancePath);
                RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {

                List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
                List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;

                // iterating only the requested descriptors automatically skips the default column family handle
                for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
                    ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);

                    ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
                        null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
                        .columnFamilyHandle;

                    //会把临时的 rockdb 实例的数据写入到 rocksdb 中
                    try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {

                        iterator.seek(startKeyGroupPrefixBytes);

                        while (iterator.isValid()) {

                            if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
                                // insert data to rocksdb
                                writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
                            } else {
                                // Since the iterator will visit the record according to the sorted order,
                                // we can just break here.
                                break;
                            }

                            iterator.next();
                        }
                    } // releases native iterator resources
                }
            } finally {
                cleanUpPathQuietly(temporaryRestoreInstancePath);
            }
        }
    }

主要就是把对应 state 的 sstFiles、miscFiles 下载到 临时指定的路径中,然后基于这个临时目录启动一个临时的 rockdb,然后将临时的 rockdb 中的数据导入到最终要使用的 rockdb,最后将临时的 rockdb 销毁掉。至于它为什么要两个 rockdb ,我猜测可能会为了数据一致性,万一下载数据下载一半失败了,具体也不太清楚,就感觉有点奇怪。

相关文章

网友评论

    本文标题:一文搞懂Flink rocksdb中的数据恢复

    本文链接:https://www.haomeiwen.com/subject/kvomdktx.html