KVState-> AbstractHeapState
function的state
ListCheckpointed
function要继承 ListCheckpointed来支持list-style state redistribution, 而弃用checkpointed
List<T> snapshotState(long checkpointId, long timestamp)
void restoreState(List<T> state)
CheckpointedAsynchronously当前版本在使用ListCheckpointed还没有实现异步
CheckpointedFunction
FlinkKafkaConsumerBase :> CheckpointedFunction, CheckpointListener
基本上也是snapshotState(), initializeState(), notifyCheckpointCompleted() 来操作这个pending的offsets和offsetsStateForCheckpoint
/** Data for pending but uncommitted offsets */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
operator的state
StreamOperator
operator需要实现这个interface
.snapshotState()
CheckpointedRestoringOperator
void restoreState(FSDataInputStream in)
key groups
为了 dynamically scale Flink operators that use partitoned(key-value) state, 使用key group概念把多个key进行分组
KeyGroupRangeOffsets
denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets
RetrievableStateHandle和SteamStateHandle
两个都是能返回一个可以拿出被写入Checkpoint outputStream流里的state的实体。 RetrievableStateHandle返回的是一个直接可用的object,SteamStateHandle 返回一个seekable的inputStream
CheckpointStateOutputStream是所有state序列化的统一入口
operator state and keyed-state
operator state被分为了operator state (= non-partitioned state)和 keyed-state (= partitioned state)
Keyed state is organized as a List<KeyGroupsStateHandle>
. Each KeyGroupsStateHandle
consists of one StreamStateHandle
and one KeyGroupRangeOffsets
object. KeyGroupRangeOffsets
denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets. The StreamStateHandle
gives access to a seekable stream that contains the actual state data for all key groups from the key group range; individual key group states are located in the stream at their previously mentioned stream offsets.
KeyGroupsStateHandle包含一个KeyGroupRangeOffsets和StreamStateHandle
KeyGroupRangeOffsets: 包含keyGroupRange,里面就是这个组的keys, 以及在stream里的每个keyGroup的offest
StreamStateHandle: 管理这个state stream, 应该是真正持有数据的载体
这两种state好像是在StateAssignmentOperation中调用的, 这个class负责recovery from checkpoint后reassign state, 重新分key group
AbstractStreamOperator
AbstractStreamOperator:>StreamOperator, 主要实现两个方法snapshotState ()
和initializeState()
KeyedStateCheckpointOutputStream和OperatorStateCheckpointOutputStream
这两个stream就是abstractStreamOperator中raw state的实现, managed state是通过stateBackend实现。
KeyedStateCheckpointOutputStream 通过keyGroup实现的redistribution
OperatorStateCheckpointOutputStream通过List-style-redistribution, Each operator returns a List of state elements(LongArrayList partitionOffsets)
StateSnapshotContextSynchronousImpl
-
getRawKeyedOperatorStateOutput(),getRawOperatorStateOutput()
两个方法build 相应的CheckpointOutputStream
-
getKeyedStateStreamFuture()
和getOperatorStateStreamFuture()
都是把delegate(CheckpointOutputStream).closeAndGetHandle()
两个raw state放入DoneFuture, 说明只能同步的,不能异步
OperatorSnapshotResult
result of snapshotState(), 维护4个future, 分别是raw和managed, 和key和operator的
private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture;
private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture;
private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
回过来说AbstractStreamOperator,这个方法是override StreamOperator的
OperatorSnapshotResult snapshotState(
long checkpointId, long timestamp, CheckpointStreamFactory streamFactory)
{
//得到keyGroupRange
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
streamFactory,
keyGroupRange,
getContainingTask().getCancelables())) {
//把timeservice写入rawKeyStateCheckpointStream
snapshotState(snapshotContext);
//把结果存入到OperatorSnapshotResult
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
//raw的state写完了, 下面写managed state
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
}
}
这个方法主要是snapshot Operator中的HeapInternalTimerService,通过getRawKeyedOperatorState()写入KeyedStateCheckpointOutputStream 里,这里处理了 raw的 keyed state。子类会重写这个方法来加逻辑。
void snapshotState(StateSnapshotContext context) {
KeyedStateCheckpointOutputStream out;
out = context.getRawKeyedOperatorStateOutput();
try {
KeyGroupsList allKeyGroups = out.getKeyGroupList();
for (int keyGroupIdx : allKeyGroups) {
out.startNewKeyGroup(keyGroupIdx);
DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
dov.writeInt(timerServices.size());
for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) {
String serviceName = entry.getKey();
HeapInternalTimerService<?, ?> timerService = entry.getValue();
dov.writeUTF(serviceName);
timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
}
}
}
initializeState()
主要就是把四个state(backend 和 raw), 通过stateHandles
@Override
public final void initializeState(OperatorStateHandles stateHandles)
{
// 两个raw和一个 managed backend
Collection<KeyGroupsStateHandle> keyedStateHandlesRaw = null;
Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
Collection<OperatorStateHandle> operatorStateHandlesBackend = null;
// 这里就是初始化了 keyStateStore,通过StreamTask初始化keyStateBackend的类field
initKeyedState();
。。。。。。。。
// 通过SteamTask进行初始化, 然后赋值给operatorStateBackend的类field
initOperatorState(operatorStateHandlesBackend);
StateInitializationContext initializationContext = new StateInitializationContextImpl(
restoring, // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateHandlesRaw, // access to keyed state stream
operatorStateHandlesRaw, // access to operator state stream
getContainingTask().getCancelables()); // access to register streams for canceling
// 从rawKeyedState里恢复TimerService
initializeState(initializationContext);
}
NOTE: 在AbstractStreamOperator有processWatermark方法,内部service.advanceWatermark(mark.getTimestamp());
getPartitionedState()
其内部调用的是 keyedStateBackend.getPartitionedState, partitioned state就是keyedState
protected <S extends State, N> S getPartitionedState(
N namespace, TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor) throws Exception {
if (keyedStateStore != null) {
return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
.......
State interface
State
ValueState : 单值状态
ListState : 集合状态
FoldingState : folding状态,for FoldFunction
ReducingState : reducing状态,for ReduceFunction
这些interface在runtime和rocksDb pack里有具体实现
Paste_Image.png
#######StateDescriptor
每个descriptor都提供了bind(),在指定的backend上创建一个新的state。
ValueStateDescriptor提供了带TypeInfo的constructor, TypeInfo可以lazily构造TypeSerilizer。 TypeInfo有着提供各种默认的class的构造子类
KVState
提供两接口
/**
* Sets the current namespace, which will be used when using the state access methods.
*
* @param namespace The namespace.
*/
void setCurrentNamespace(N namespace);
/**
* Returns the serialized value for the given key and namespace.
*
* <p>If no value is associated with key and namespace, <code>null</code>
* is returned.
*
* @param serializedKeyAndNamespace Serialized key and namespace
* @return Serialized value or <code>null</code> if no value is associated
* with the key and namespace.
* @throws Exception Exceptions during serialization are forwarded
*/
byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
kvstate有3个generic 和 heap 和 rocksdb的实现
Paste_Image.png
AbstractHeapState
AbstractHeapState:> KvState
StateTable
//heap实现中真正存kv的map, k就是
/** Map containing the actual key/value pairs */
protected final StateTable<K, N, SV> stateTable;
其内部存储了 List<Map<N, Map<K, ST>>> state;
list的每个元素代表一个keyGroup对应的state, map是namespace ->(key-> real value)
HeapValueState
class HeapValueState<K, N, V>
extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
implements ValueState<V> {
继承两个接口
ValueState:> State
AbstractHeapState :> KVState (get)
其中state的name和namespace的什么关系? name就是人为指定的state名字, 而namespace则是
keyed state.getCurrentKey() 这里的key是什么? 这个就是keyGroup的key,不是state的name.
在使用keyedStateBackend.getParitionedState()时, 会根据name从keyValueStatesByName中拿到state(kvState), 再会设置state.currentNamespace(),然后call HeapValueState.value()
(ValueState有两个接口 value()和update())。, 这个时候就会通过KeyedStateBackend找到currentKeyGroupIndex和key,从StateTable中用keyGrouopIndex得到namespaceMap,再用namespace和key得到value。update的逻辑也是类似的。
AbstractBackend
提供三个方法
/**
* Creates a {@link CheckpointStreamFactory} that can be used to create streams
* that should end up in a checkpoint. 一个用于Checkpoint的stream
*/
public abstract CheckpointStreamFactory createStreamFactory
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(env.getUserClassLoader());
}
NOTE:这个方法没被重载 就是concrete方法
Paste_Image.pngAbstractKeyedStateBackend
FsKeyedStateBackend和MemoryStateBackend使用 HeapKeyedStateBackend(AbstractBackend.createKeyedStateBackend()
)
RockDBStateBackend使用RockDBKeyedStateBackend
提供了很多关于key操作的接口(继承自keyedStateBackend)
Paste_Image.png这个class的核心method为getPartitionedState()
public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
// 省略一些检查。。。。
// keyValueSateByName 就是state name -> KVSate 的mapping,如果没有新建一个
if (keyValueStatesByName == null) {
keyValueStatesByName = new HashMap<>();
}
// lastName和lastState是caching,并设置**namespace**
if (lastName != null && lastName.equals(stateDescriptor.getName())) {
lastState.setCurrentNamespace(namespace);
return (S) lastState;
}
// 从keyValueStatesByName里拿, 并赋值给caching lastXXX
KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
if (previous != null) {
lastState = previous;
lastState.setCurrentNamespace(namespace);
lastName = stateDescriptor.getName();
return (S) previous;
}
// create a new blank key/value state
//用sd.bind进行backend的绑定, bind方法也是根据实现的子类不同call相应类型state的create方法, 类似factory method
S state = stateDescriptor.bind(new StateBackend() {
@Override
public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
}
@Override
public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
}
@Override
public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
}
@Override
public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
}
});
KvState kvState = (KvState) state;
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
lastName = stateDescriptor.getName();
lastState = kvState;
kvState.setCurrentNamespace(namespace);
// Publish queryable state。。。。。
keyValueStatesByName的key就是state的name
Snapshotable
所以的backend都实现了其snapshot和restore方法
这两个方法才是真正做Checkpoint和读取Checkpoint恢复的方法。
snapshot
各个backend有各自的实现, 这里以heapKeyedSatatBackend为例
RunnableFuture<KeyGroupsStateHandle> snapshot(long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory) {
//CheckpointStateOutputStream 才是真正的往外部写Checkpoint的stream
try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.
createCheckpointStateOutputStream(checkpointId, timestamp)) {
//包装类, 提供一个write() method
DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
.....
//最后把stateTables里的所以state写入outView ( Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();)
// 统计meta到metaInfoProxyList
for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
metaInfo.getStateType(),
metaInfo.getName(),
metaInfo.getNamespaceSerializer(),
metaInfo.getStateSerializer());
metaInfoProxyList.add(metaInfoProxy);
kVStateToId.put(kvState.getKey(), kVStateToId.size());
}
// 写checkpoint
for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
//写的同时记住offest, 然后放在return的
keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
outView.writeInt(keyGroupIndex);
for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
outView.writeShort(kVStateToId.get(kvState.getKey()));
writeStateTableForKeyGroup(outView, kvState.getValue(), keyGroupIndex);
}
}
//返回用的streamStateHandle
StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
// 返回一个keyGroupsStateHandle
return new DoneFuture<>(keyGroupsStateHandle);
}
restore ???。。。。
Paste_Image.png
RockDb backend
AbstractRocksDBState
writeKeyWithGroupAndNamespace() - 把一系列的state相关key(keyGroup, key, namespace)写入keySerializationStream, 然后得到的是写入rockdb的key。
protected void writeKeyWithGroupAndNamespace(
int keyGroup, K key, N namespace,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView) throws IOException {
Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
keySerializationStream.reset();
writeKeyGroup(keyGroup, keySerializationDataOutputView);
writeKey(key, keySerializationStream, keySerializationDataOutputView);
writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView);
}
getSerializedValue() - 本质还是通过byte[] serializedKeyAndNamespace从rockdb中得到value
RocksDBValueState
继承自ValueState的两个接口 update和value
update()
@Override
public void update(V value) throws IOException {
if (value == null) {
clear();
return;
}
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
try {
// 调用AbstractRockDBState的方法, 把keyByStream的key, keyGroup, namespace当做rockdb的key写入keySerializationStream, 并toByteArray
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
keySerializationStream.reset();
// 序列化value
valueSerializer.serialize(value, out);
// 写入db, columnFamily就是这个state的name, 来自stateDescriptor
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
RocksDBStateBackend :> AbstractStateBackend
实现了其 创建operator和keyed state backend的方法 - createKeyedStateBackend()
和createOperatorStateBackend()
RcoksDBKeyedStateBackend
-
关于keyed state; 真正进行checkpoint到fs上的实现, 在AbstractStreamOperator.snapshotState()中调用
-
实现的shapshotable的两个接口方法 -
restore
和snapshot
restore()
snapshot() - 根据是否是savepoint和enableIncrementalCheckpointing来选择Incrementally 还是fully
snapshotFully() - 完全异步实现
snapshotIncrementally() - semi-async
分为两步 - RocksDBIncrementalSnapshotOperation.takeSnapshot()
和 materializeSnapshot()
private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
final long checkpointId,
final long checkpointTimestamp,
final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
new RocksDBIncrementalSnapshotOperation<>(
this,
checkpointStreamFactory,
checkpointId,
checkpointTimestamp);
synchronized (asyncSnapshotLock) {
if (db == null) {
throw new IOException("RocksDB closed.");
}
if (!hasRegisteredState()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " +
checkpointTimestamp + " . Returning null.");
}
return DoneFuture.nullValue();
}
// rocksdb 进行自身的checkpoint
snapshotOperation.takeSnapshot();
}
return new FutureTask<KeyedStateHandle>(
new Callable<KeyedStateHandle>() {
@Override
public KeyedStateHandle call() throws Exception {
//物化到hdfs上的checkpoint目录里
return snapshotOperation.materializeSnapshot();
}
}
) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
snapshotOperation.stop();
return super.cancel(mayInterruptIfRunning);
}
@Override
protected void done() {
// 释放resource
snapshotOperation.releaseResources(isCancelled());
}
};
}
RocksDBIncrementalSnapshotOperation
- RocksDBIncrementalSnapshotOperation.takeSnapshot()` - 同步, 用rocksdb自身进行checkpoint
void takeSnapshot() throws Exception {
assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
// use the last completed checkpoint as the comparison base.
synchronized (stateBackend.materializedSstFiles) {
baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
}
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
}
// save state data
backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
backupFileSystem = backupPath.getFileSystem();
if (backupFileSystem.exists(backupPath)) {
throw new IllegalStateException("Unexpected existence of the backup directory.");
}
// create hard links of living files in the checkpoint path
Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
// 创建真正的checkpoint在本地disk上, 存储:
// 1. 已经存在的sst文件的link
// 2. a copied manifest files and other files(可能包括新建文件吧)
checkpoint.createCheckpoint(backupPath.getPath());
}
-
RocksDBIncrementalSnapshotOperation.materializeSnapshot()
- 异步KeyedStateHandle materializeSnapshot() throws Exception { stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry); // write meta data metaStateHandle = materializeMetaData(); // write state data Preconditions.checkState(backupFileSystem.exists(backupPath)); FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); if (fileStatuses != null) { // 打开 takeSnapsot的directory, 遍历, 如果是上个cp已经保存过的, 就用placeholder, 用share state后期来替换 // 如果不存在, 就materializeStateData, 真正写ufs for (FileStatus fileStatus : fileStatuses) { final Path filePath = fileStatus.getPath(); final String fileName = filePath.getName(); final StateHandleID stateHandleID = new StateHandleID(fileName); if (fileName.endsWith(SST_FILE_SUFFIX)) { final boolean existsAlready = baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID); if (existsAlready) { // we introduce a placeholder state handle, that is replaced with the // original from the shared state registry (created from a previous checkpoint) sstFiles.put( stateHandleID, new PlaceholderStreamStateHandle()); } else { sstFiles.put(stateHandleID, materializeStateData(filePath)); } } else { StreamStateHandle fileHandle = materializeStateData(filePath); miscFiles.put(stateHandleID, fileHandle); } } } synchronized (stateBackend.materializedSstFiles) { stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); } return new IncrementalKeyedStateHandle( stateBackend.operatorIdentifier, stateBackend.keyGroupRange, checkpointId, sstFiles, miscFiles, metaStateHandle); }
restore(Collection<KeyedStateHandle> restoreStateHandles) -
相应的, restore根据keyedStateHandle的实现来确定使用哪种operation里进行restore
try {
if (restoreState == null || restoreState.isEmpty()) {
createDB();
} else if (MigrationUtil.isOldSavepointKeyedState(restoreState)) {
LOG.info("Converting RocksDB state from old savepoint.");
restoreOldSavepointKeyedState(restoreState);
} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
restoreOperation.restore(restoreState);
} else {
RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
restoreOperation.doRestore(restoreState);
}
RocksDBIncrementalRestoreOperation
restoreInstance() - restore的逻辑是从hdfs读上来到local目录restoreInstancePath下面, 然后在通过创建软连接 或者创建restoredb读出并写入instanceRocksDBPath
private void restoreInstance(
IncrementalKeyedStateHandle restoreStateHandle,
boolean hasExtraKeys) throws Exception {
// read state data
Path restoreInstancePath = new Path(
stateBackend.instanceBasePath.getAbsolutePath(),
UUID.randomUUID().toString());
try {
final Map<StateHandleID, StreamStateHandle> sstFiles =
restoreStateHandle.getSharedState();
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();
// 把数据读出来 写入本地restoreInstancePath
readAllStateData(sstFiles, restoreInstancePath);
readAllStateData(miscFiles, restoreInstancePath);
// read meta data
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
readMetaData(restoreStateHandle.getMetaStateHandle());
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
stateBackend.columnOptions);
columnFamilyDescriptors.add(columnFamilyDescriptor);
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
}
// 如果keyRange 有变化, 比如改变parallelism, 那么就把属于自己的keyRange 写入到db里面
if (hasExtraKeys) {
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
// 将restoreInstancePath以db path, 初始化个临时restoreDb, 用来合并, 如果有就加入当前db
try (RocksDB restoreDb = stateBackend.openDB(
restoreInstancePath.getPath(),
columnFamilyDescriptors,
columnFamilyHandles)) {
for (int i = 0; i < columnFamilyHandles.size(); ++i) {
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
if (null == registeredStateMetaInfoEntry) {
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
stateMetaInfoSnapshot.getStateType(),
stateMetaInfoSnapshot.getName(),
stateMetaInfoSnapshot.getNamespaceSerializer(),
stateMetaInfoSnapshot.getStateSerializer());
registeredStateMetaInfoEntry =
new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
stateMetaInfo);
stateBackend.kvStateInformation.put(
stateMetaInfoSnapshot.getName(),
registeredStateMetaInfoEntry);
}
ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
int keyGroup = 0;
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}
// 如果当前db有这个keGroup, 将kv放入当前rocksdb
if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}
iterator.next();
}
}
}
}
} else {
// 直接建立hard link from restoreInputPath 到 instanceRocksDBPath
// create hard links in the instance directory
if (!stateBackend.instanceRocksDBPath.mkdirs()) {
throw new IOException("Could not create RocksDB data directory.");
}
createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
stateBackend.db = stateBackend.openDB(
stateBackend.instanceRocksDBPath.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
stateMetaInfoSnapshot.getStateType(),
stateMetaInfoSnapshot.getName(),
stateMetaInfoSnapshot.getNamespaceSerializer(),
stateMetaInfoSnapshot.getStateSerializer());
stateBackend.kvStateInformation.put(
stateMetaInfoSnapshot.getName(),
new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
columnFamilyHandle, stateMetaInfo));
}
// use the restore sst files as the base for succeeding checkpoints
synchronized (stateBackend.materializedSstFiles) {
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
}
// 把restored的CheckpointId 设为lastCompleteCheckpointId
stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
}
}
readAllStateData() - 把hdfs中文件 通过remoteFileHandle 写入restoreInstancePath
private void readAllStateData(
Map<StateHandleID, StreamStateHandle> stateHandleMap,
Path restoreInstancePath) throws IOException {
for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
StateHandleID stateHandleID = entry.getKey();
StreamStateHandle remoteFileHandle = entry.getValue();
// 从remoteFileHandle读入本地disk restoreInstancePath
readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
}
}
具体meta细节没研究
网友评论