美文网首页flink
state源码分析

state源码分析

作者: edd72e125c98 | 来源:发表于2018-06-12 09:26 被阅读104次

    KVState-> AbstractHeapState

    function的state

    ListCheckpointed

    function要继承 ListCheckpointed来支持list-style state redistribution, 而弃用checkpointed
    List<T> snapshotState(long checkpointId, long timestamp)
    void restoreState(List<T> state)

    CheckpointedAsynchronously当前版本在使用ListCheckpointed还没有实现异步

    CheckpointedFunction

    FlinkKafkaConsumerBase :> CheckpointedFunction, CheckpointListener
    基本上也是snapshotState(), initializeState(), notifyCheckpointCompleted() 来操作这个pending的offsets和offsetsStateForCheckpoint

    /** Data for pending but uncommitted offsets */
        private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    

    operator的state

    StreamOperator

    operator需要实现这个interface
    .snapshotState()

    CheckpointedRestoringOperator

    void restoreState(FSDataInputStream in)

    key groups

    为了 dynamically scale Flink operators that use partitoned(key-value) state, 使用key group概念把多个key进行分组

    KeyGroupRangeOffsets denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets

    RetrievableStateHandle和SteamStateHandle
    两个都是能返回一个可以拿出被写入Checkpoint outputStream流里的state的实体。 RetrievableStateHandle返回的是一个直接可用的object,SteamStateHandle 返回一个seekable的inputStream

    CheckpointStateOutputStream是所有state序列化的统一入口

    operator state and keyed-state

    operator state被分为了operator state (= non-partitioned state)和 keyed-state (= partitioned state)
    Keyed state is organized as a List<KeyGroupsStateHandle>. Each KeyGroupsStateHandle consists of one StreamStateHandle and one KeyGroupRangeOffsets object. KeyGroupRangeOffsets denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets. The StreamStateHandle gives access to a seekable stream that contains the actual state data for all key groups from the key group range; individual key group states are located in the stream at their previously mentioned stream offsets.

    KeyGroupsStateHandle包含一个KeyGroupRangeOffsets和StreamStateHandle
    KeyGroupRangeOffsets: 包含keyGroupRange,里面就是这个组的keys, 以及在stream里的每个keyGroup的offest
    StreamStateHandle: 管理这个state stream, 应该是真正持有数据的载体

    这两种state好像是在StateAssignmentOperation中调用的, 这个class负责recovery from checkpoint后reassign state, 重新分key group

    AbstractStreamOperator

    AbstractStreamOperator:>StreamOperator, 主要实现两个方法snapshotState ()initializeState()

    KeyedStateCheckpointOutputStream和OperatorStateCheckpointOutputStream

    这两个stream就是abstractStreamOperator中raw state的实现, managed state是通过stateBackend实现。
    KeyedStateCheckpointOutputStream 通过keyGroup实现的redistribution
    OperatorStateCheckpointOutputStream通过List-style-redistribution, Each operator returns a List of state elements(LongArrayList partitionOffsets)

    Paste_Image.png
    StateSnapshotContextSynchronousImpl
    1. getRawKeyedOperatorStateOutput(),getRawOperatorStateOutput()两个方法build 相应的CheckpointOutputStream
    2. getKeyedStateStreamFuture()getOperatorStateStreamFuture()都是把delegate(CheckpointOutputStream).closeAndGetHandle()两个raw state放入DoneFuture, 说明只能同步的,不能异步
    OperatorSnapshotResult

    result of snapshotState(), 维护4个future, 分别是raw和managed, 和key和operator的

    private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture;
        private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture;
        private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
        private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
    

    回过来说AbstractStreamOperator,这个方法是override StreamOperator的

    OperatorSnapshotResult snapshotState(
                long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) 
    {
    //得到keyGroupRange
                    KeyGroupRange keyGroupRange = null != keyedStateBackend ?
    keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
    
            OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
    
            try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                    checkpointId,
                    timestamp,
                    streamFactory,
                    keyGroupRange,
                    getContainingTask().getCancelables())) {
    
         //把timeservice写入rawKeyStateCheckpointStream
                snapshotState(snapshotContext);
    
            //把结果存入到OperatorSnapshotResult
    snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
                snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
    
                    //raw的state写完了, 下面写managed state
                if (null != operatorStateBackend) {
                    snapshotInProgress.setOperatorStateManagedFuture(
                        operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
                }
    
                if (null != keyedStateBackend) {
                    snapshotInProgress.setKeyedStateManagedFuture(
                        keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
                }
    }
    

    这个方法主要是snapshot Operator中的HeapInternalTimerService,通过getRawKeyedOperatorState()写入KeyedStateCheckpointOutputStream 里,这里处理了 raw的 keyed state。子类会重写这个方法来加逻辑。

     void snapshotState(StateSnapshotContext context) {
       KeyedStateCheckpointOutputStream out;
       out = context.getRawKeyedOperatorStateOutput();
    
             try {
                    KeyGroupsList allKeyGroups = out.getKeyGroupList();
                    for (int keyGroupIdx : allKeyGroups) {
                        out.startNewKeyGroup(keyGroupIdx);
    
                        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
                        dov.writeInt(timerServices.size());
    
                        for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) {
                            String serviceName = entry.getKey();
                            HeapInternalTimerService<?, ?> timerService = entry.getValue();
    
                            dov.writeUTF(serviceName);
                            timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
                        }
                    }
    }
    
    initializeState()

    主要就是把四个state(backend 和 raw), 通过stateHandles

    @Override
        public final void initializeState(OperatorStateHandles stateHandles)
    {
        // 两个raw和一个 managed backend
                    Collection<KeyGroupsStateHandle> keyedStateHandlesRaw = null;
            Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
            Collection<OperatorStateHandle> operatorStateHandlesBackend = null;
    
     // 这里就是初始化了 keyStateStore,通过StreamTask初始化keyStateBackend的类field
               initKeyedState();
    。。。。。。。。
       // 通过SteamTask进行初始化, 然后赋值给operatorStateBackend的类field
        initOperatorState(operatorStateHandlesBackend);
    
    
        StateInitializationContext initializationContext = new StateInitializationContextImpl(
                    restoring, // information whether we restore or start for the first time
                    operatorStateBackend, // access to operator state backend
                    keyedStateStore, // access to keyed state backend
                    keyedStateHandlesRaw, // access to keyed state stream
                    operatorStateHandlesRaw, // access to operator state stream
                    getContainingTask().getCancelables()); // access to register streams for canceling
    // 从rawKeyedState里恢复TimerService
            initializeState(initializationContext);
    
    }
    

    NOTE: 在AbstractStreamOperator有processWatermark方法,内部service.advanceWatermark(mark.getTimestamp());

    getPartitionedState()

    其内部调用的是 keyedStateBackend.getPartitionedState, partitioned state就是keyedState

        protected <S extends State, N> S getPartitionedState(
                N namespace, TypeSerializer<N> namespaceSerializer,
                StateDescriptor<S, ?> stateDescriptor) throws Exception {
    
            if (keyedStateStore != null) {
                return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
    .......
    

    State interface

    State

    ValueState : 单值状态
    ListState : 集合状态
    FoldingState : folding状态,for FoldFunction
    ReducingState : reducing状态,for ReduceFunction
    这些interface在runtime和rocksDb pack里有具体实现


    Paste_Image.png

    #######StateDescriptor
    每个descriptor都提供了bind(),在指定的backend上创建一个新的state。


    ValueStateDescriptor提供了带TypeInfo的constructor, TypeInfo可以lazily构造TypeSerilizer。 TypeInfo有着提供各种默认的class的构造子类

    KVState

    提供两接口

    /**
         * Sets the current namespace, which will be used when using the state access methods.
         *
         * @param namespace The namespace.
         */
        void setCurrentNamespace(N namespace);
    
        /**
         * Returns the serialized value for the given key and namespace.
         *
         * <p>If no value is associated with key and namespace, <code>null</code>
         * is returned.
         *
         * @param serializedKeyAndNamespace Serialized key and namespace
         * @return Serialized value or <code>null</code> if no value is associated
         * with the key and namespace.
         * @throws Exception Exceptions during serialization are forwarded
         */
        byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
    

    kvstate有3个generic 和 heap 和 rocksdb的实现


    Paste_Image.png

    AbstractHeapState

    AbstractHeapState:> KvState
    StateTable

    //heap实现中真正存kv的map, k就是
        /** Map containing the actual key/value pairs */
        protected final StateTable<K, N, SV> stateTable;
    
    

    其内部存储了 List<Map<N, Map<K, ST>>> state;
    list的每个元素代表一个keyGroup对应的state, map是namespace ->(key-> real value)

    HeapValueState
    class HeapValueState<K, N, V>
            extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
            implements ValueState<V> {
    

    继承两个接口
    ValueState:> State
    AbstractHeapState :> KVState (get)

    其中state的name和namespace的什么关系? name就是人为指定的state名字, 而namespace则是
    keyed state.getCurrentKey() 这里的key是什么? 这个就是keyGroup的key,不是state的name.

    在使用keyedStateBackend.getParitionedState()时, 会根据name从keyValueStatesByName中拿到state(kvState), 再会设置state.currentNamespace(),然后call HeapValueState.value()
    (ValueState有两个接口 value()和update())。, 这个时候就会通过KeyedStateBackend找到currentKeyGroupIndex和key,从StateTable中用keyGrouopIndex得到namespaceMap,再用namespace和key得到value。update的逻辑也是类似的。

    AbstractBackend

    提供三个方法

    /**
         * Creates a {@link CheckpointStreamFactory} that can be used to create streams
         * that should end up in a checkpoint. 一个用于Checkpoint的stream
         */
        public abstract CheckpointStreamFactory createStreamFactory
    
    public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend
    
    public OperatorStateBackend createOperatorStateBackend(
                Environment env,
                String operatorIdentifier) throws Exception {
            return new DefaultOperatorStateBackend(env.getUserClassLoader());
        }
    

    NOTE:这个方法没被重载 就是concrete方法

    Paste_Image.png
    AbstractKeyedStateBackend

    FsKeyedStateBackend和MemoryStateBackend使用 HeapKeyedStateBackend(AbstractBackend.createKeyedStateBackend()
    RockDBStateBackend使用RockDBKeyedStateBackend

    Paste_Image.png

    提供了很多关于key操作的接口(继承自keyedStateBackend)

    Paste_Image.png

    这个class的核心method为getPartitionedState()

    public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
    // 省略一些检查。。。。
    
    // keyValueSateByName 就是state name -> KVSate 的mapping,如果没有新建一个
    if (keyValueStatesByName == null) {
                keyValueStatesByName = new HashMap<>();
            }
    
    // lastName和lastState是caching,并设置**namespace**
            if (lastName != null && lastName.equals(stateDescriptor.getName())) {
                lastState.setCurrentNamespace(namespace);
                return (S) lastState;
            }
    
    // 从keyValueStatesByName里拿, 并赋值给caching lastXXX
            KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
            if (previous != null) {
                lastState = previous;
                lastState.setCurrentNamespace(namespace);
                lastName = stateDescriptor.getName();
                return (S) previous;
            }
    
    // create a new blank key/value state
    //用sd.bind进行backend的绑定, bind方法也是根据实现的子类不同call相应类型state的create方法, 类似factory method
            S state = stateDescriptor.bind(new StateBackend() {
                @Override
                public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                    return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
                }
    
                @Override
                public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
                    return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
                }
    
                @Override
                public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
                    return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
                }
    
                @Override
                public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
                    return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
                }
    
            });
    
            KvState kvState = (KvState) state;
    
            keyValueStatesByName.put(stateDescriptor.getName(), kvState);
    
            lastName = stateDescriptor.getName();
            lastState = kvState;
    
            kvState.setCurrentNamespace(namespace);
    
    // Publish queryable state。。。。。
    

    keyValueStatesByName的key就是state的name

    Snapshotable

    所以的backend都实现了其snapshot和restore方法
    这两个方法才是真正做Checkpoint和读取Checkpoint恢复的方法。

    snapshot
    各个backend有各自的实现, 这里以heapKeyedSatatBackend为例

     RunnableFuture<KeyGroupsStateHandle> snapshot(long checkpointId,
                long timestamp,
                CheckpointStreamFactory streamFactory) {
    
    //CheckpointStateOutputStream 才是真正的往外部写Checkpoint的stream
    try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.
                    createCheckpointStateOutputStream(checkpointId, timestamp)) {
    
                        //包装类, 提供一个write() method
                DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
    .....
    
                    //最后把stateTables里的所以state写入outView ( Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();)
    
                    // 统计meta到metaInfoProxyList
                for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
    
                    RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
                    KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
                            metaInfo.getStateType(),
                            metaInfo.getName(),
                            metaInfo.getNamespaceSerializer(),
                            metaInfo.getStateSerializer());
    
                    metaInfoProxyList.add(metaInfoProxy);
                    kVStateToId.put(kvState.getKey(), kVStateToId.size());
                }
    
    // 写checkpoint
    for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
                        //写的同时记住offest, 然后放在return的
                    keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
                    outView.writeInt(keyGroupIndex);
                    for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
                        outView.writeShort(kVStateToId.get(kvState.getKey()));
                        writeStateTableForKeyGroup(outView, kvState.getValue(), keyGroupIndex);
                    }
                }
    
    
                         //返回用的streamStateHandle
                            StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
    
                             KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
    
                final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
                            // 返回一个keyGroupsStateHandle
                return new DoneFuture<>(keyGroupsStateHandle);
    
    
    }
    

    restore ???。。。。


    Paste_Image.png

    RockDb backend

    AbstractRocksDBState

    writeKeyWithGroupAndNamespace() - 把一系列的state相关key(keyGroup, key, namespace)写入keySerializationStream, 然后得到的是写入rockdb的key

    protected void writeKeyWithGroupAndNamespace(
                int keyGroup, K key, N namespace,
                ByteArrayOutputStreamWithPos keySerializationStream,
                DataOutputView keySerializationDataOutputView) throws IOException {
    
            Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
    
            keySerializationStream.reset();
            writeKeyGroup(keyGroup, keySerializationDataOutputView);
            writeKey(key, keySerializationStream, keySerializationDataOutputView);
            writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView);
        }
    

    getSerializedValue() - 本质还是通过byte[] serializedKeyAndNamespace从rockdb中得到value

    RocksDBValueState

    继承自ValueState的两个接口 update和value
    update()

    @Override
        public void update(V value) throws IOException {
            if (value == null) {
                clear();
                return;
            }
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
            try {
    // 调用AbstractRockDBState的方法, 把keyByStream的key, keyGroup, namespace当做rockdb的key写入keySerializationStream, 并toByteArray
                writeCurrentKeyWithGroupAndNamespace();
                byte[] key = keySerializationStream.toByteArray();
                keySerializationStream.reset();
    // 序列化value
                valueSerializer.serialize(value, out);
    // 写入db, columnFamily就是这个state的name, 来自stateDescriptor
                backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
            } catch (Exception e) {
                throw new RuntimeException("Error while adding data to RocksDB", e);
            }
        }
    
    RocksDBStateBackend :> AbstractStateBackend

    实现了其 创建operator和keyed state backend的方法 - createKeyedStateBackend()createOperatorStateBackend()

    RcoksDBKeyedStateBackend
    • 关于keyed state; 真正进行checkpoint到fs上的实现, 在AbstractStreamOperator.snapshotState()中调用

    • 实现的shapshotable的两个接口方法 - restoresnapshot

    restore()

    snapshot() - 根据是否是savepoint和enableIncrementalCheckpointing来选择Incrementally 还是fully

    snapshotFully() - 完全异步实现

    snapshotIncrementally() - semi-async
    分为两步 - RocksDBIncrementalSnapshotOperation.takeSnapshot()materializeSnapshot()

    private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
                final long checkpointId,
                final long checkpointTimestamp,
                final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
    
            final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
                new RocksDBIncrementalSnapshotOperation<>(
                    this,
                    checkpointStreamFactory,
                    checkpointId,
                    checkpointTimestamp);
    
            synchronized (asyncSnapshotLock) {
                if (db == null) {
                    throw new IOException("RocksDB closed.");
                }
    
                if (!hasRegisteredState()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " +
                                checkpointTimestamp + " . Returning null.");
                    }
                    return DoneFuture.nullValue();
                }
                       // rocksdb 进行自身的checkpoint
                snapshotOperation.takeSnapshot();
            }
    
            return new FutureTask<KeyedStateHandle>(
                new Callable<KeyedStateHandle>() {
                    @Override
                    public KeyedStateHandle call() throws Exception { 
                                         //物化到hdfs上的checkpoint目录里
                        return snapshotOperation.materializeSnapshot();
                    }
                }
            ) {
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    snapshotOperation.stop();
                    return super.cancel(mayInterruptIfRunning);
                }
    
                @Override
                protected void done() {
                                     // 释放resource
                    snapshotOperation.releaseResources(isCancelled());
                }
            };
        }
    

    RocksDBIncrementalSnapshotOperation

    1. RocksDBIncrementalSnapshotOperation.takeSnapshot()` - 同步, 用rocksdb自身进行checkpoint
        void takeSnapshot() throws Exception {
                assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
    
                // use the last completed checkpoint as the comparison base.
                synchronized (stateBackend.materializedSstFiles) {
                    baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
                }
    
                // save meta data
                for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
                        : stateBackend.kvStateInformation.entrySet()) {
                    stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
                }
    
                // save state data
                backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
                backupFileSystem = backupPath.getFileSystem();
                if (backupFileSystem.exists(backupPath)) {
                    throw new IllegalStateException("Unexpected existence of the backup directory.");
                }
    
                // create hard links of living files in the checkpoint path
                Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
                // 创建真正的checkpoint在本地disk上, 存储:
                // 1. 已经存在的sst文件的link
                // 2. a copied manifest files and other files(可能包括新建文件吧)
                checkpoint.createCheckpoint(backupPath.getPath());
            }
    
    1. RocksDBIncrementalSnapshotOperation.materializeSnapshot() - 异步

      KeyedStateHandle materializeSnapshot() throws Exception {
      
         stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
      
         // write meta data
         metaStateHandle = materializeMetaData();
      
         // write state data
         Preconditions.checkState(backupFileSystem.exists(backupPath));
      
         FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
         if (fileStatuses != null) {
           // 打开 takeSnapsot的directory, 遍历, 如果是上个cp已经保存过的, 就用placeholder, 用share state后期来替换
           // 如果不存在, 就materializeStateData, 真正写ufs
            for (FileStatus fileStatus : fileStatuses) {
               final Path filePath = fileStatus.getPath();
               final String fileName = filePath.getName();
               final StateHandleID stateHandleID = new StateHandleID(fileName);
      
               if (fileName.endsWith(SST_FILE_SUFFIX)) {
                  final boolean existsAlready =
                     baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID);
      
                  if (existsAlready) {
                     // we introduce a placeholder state handle, that is replaced with the
                     // original from the shared state registry (created from a previous checkpoint)
                     sstFiles.put(
                        stateHandleID,
                        new PlaceholderStreamStateHandle());
                  } else {
                     sstFiles.put(stateHandleID, materializeStateData(filePath));
                  }
               } else {
                  StreamStateHandle fileHandle = materializeStateData(filePath);
                  miscFiles.put(stateHandleID, fileHandle);
               }
            }
         }
         synchronized (stateBackend.materializedSstFiles) {
            stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
         }
       
         return new IncrementalKeyedStateHandle(
            stateBackend.operatorIdentifier,
            stateBackend.keyGroupRange,
            checkpointId,
            sstFiles,
            miscFiles,
            metaStateHandle);
      }
      

    restore(Collection<KeyedStateHandle> restoreStateHandles) -

    相应的, restore根据keyedStateHandle的实现来确定使用哪种operation里进行restore

    try {
       if (restoreState == null || restoreState.isEmpty()) {
          createDB();
       } else if (MigrationUtil.isOldSavepointKeyedState(restoreState)) {
          LOG.info("Converting RocksDB state from old savepoint.");
          restoreOldSavepointKeyedState(restoreState);
       } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
          RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
          restoreOperation.restore(restoreState);
       } else {
          RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
          restoreOperation.doRestore(restoreState);
       }
    
    RocksDBIncrementalRestoreOperation

    restoreInstance() - restore的逻辑是从hdfs读上来到local目录restoreInstancePath下面, 然后在通过创建软连接 或者创建restoredb读出并写入instanceRocksDBPath

    private void restoreInstance(
          IncrementalKeyedStateHandle restoreStateHandle,
          boolean hasExtraKeys) throws Exception {
    
       // read state data
       Path restoreInstancePath = new Path(
          stateBackend.instanceBasePath.getAbsolutePath(),
          UUID.randomUUID().toString());
    
       try {
          final Map<StateHandleID, StreamStateHandle> sstFiles =
             restoreStateHandle.getSharedState();
          final Map<StateHandleID, StreamStateHandle> miscFiles =
             restoreStateHandle.getPrivateState();
    
         // 把数据读出来 写入本地restoreInstancePath
          readAllStateData(sstFiles, restoreInstancePath);
          readAllStateData(miscFiles, restoreInstancePath);
    
          // read meta data
          List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
             readMetaData(restoreStateHandle.getMetaStateHandle());
    
          List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    
          for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
    
             ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
                stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
                stateBackend.columnOptions);
    
             columnFamilyDescriptors.add(columnFamilyDescriptor);
             stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
          }
    
         // 如果keyRange 有变化, 比如改变parallelism, 那么就把属于自己的keyRange 写入到db里面
          if (hasExtraKeys) {
    
             List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
    
            // 将restoreInstancePath以db path, 初始化个临时restoreDb, 用来合并, 如果有就加入当前db
             try (RocksDB restoreDb = stateBackend.openDB(
                   restoreInstancePath.getPath(),
                   columnFamilyDescriptors,
                   columnFamilyHandles)) {
    
                for (int i = 0; i < columnFamilyHandles.size(); ++i) {
                   ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
                   ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
                   RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
    
                   Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
                      stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
    
                   if (null == registeredStateMetaInfoEntry) {
    
                      RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
                         new RegisteredKeyedBackendStateMetaInfo<>(
                            stateMetaInfoSnapshot.getStateType(),
                            stateMetaInfoSnapshot.getName(),
                            stateMetaInfoSnapshot.getNamespaceSerializer(),
                            stateMetaInfoSnapshot.getStateSerializer());
    
                      registeredStateMetaInfoEntry =
                         new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
                            stateBackend.db.createColumnFamily(columnFamilyDescriptor),
                            stateMetaInfo);
    
                      stateBackend.kvStateInformation.put(
                         stateMetaInfoSnapshot.getName(),
                         registeredStateMetaInfoEntry);
                   }
    
                   ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
    
                   try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
    
                      int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
                      byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
                      for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
                         startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
                      }
    
                      iterator.seek(startKeyGroupPrefixBytes);
    
                      while (iterator.isValid()) {
    
                         int keyGroup = 0;
                         for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
                            keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
                         }
                          
                        // 如果当前db有这个keGroup, 将kv放入当前rocksdb
                         if (stateBackend.keyGroupRange.contains(keyGroup)) {
                            stateBackend.db.put(targetColumnFamilyHandle,
                               iterator.key(), iterator.value());
                         }
    
                         iterator.next();
                      }
                   }
                }
             }
          } else {
            // 直接建立hard link from restoreInputPath 到 instanceRocksDBPath
             // create hard links in the instance directory
             if (!stateBackend.instanceRocksDBPath.mkdirs()) {
                throw new IOException("Could not create RocksDB data directory.");
             }
    
             createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
             createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
    
             List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
             stateBackend.db = stateBackend.openDB(
                stateBackend.instanceRocksDBPath.getAbsolutePath(),
                columnFamilyDescriptors, columnFamilyHandles);
    
             for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
                RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
    
                ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
                RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
                   new RegisteredKeyedBackendStateMetaInfo<>(
                      stateMetaInfoSnapshot.getStateType(),
                      stateMetaInfoSnapshot.getName(),
                      stateMetaInfoSnapshot.getNamespaceSerializer(),
                      stateMetaInfoSnapshot.getStateSerializer());
    
                stateBackend.kvStateInformation.put(
                   stateMetaInfoSnapshot.getName(),
                   new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
                      columnFamilyHandle, stateMetaInfo));
             }
    
    
             // use the restore sst files as the base for succeeding checkpoints
             synchronized (stateBackend.materializedSstFiles) {
                stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
             }
              // 把restored的CheckpointId 设为lastCompleteCheckpointId
             stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
          }
    }
    

    readAllStateData() - 把hdfs中文件 通过remoteFileHandle 写入restoreInstancePath

    private void readAllStateData(
       Map<StateHandleID, StreamStateHandle> stateHandleMap,
       Path restoreInstancePath) throws IOException {
    
       for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
          StateHandleID stateHandleID = entry.getKey();
          StreamStateHandle remoteFileHandle = entry.getValue();
          //  从remoteFileHandle读入本地disk restoreInstancePath
          readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
       }
    }
    

    具体meta细节没研究

    相关文章

      网友评论

        本文标题:state源码分析

        本文链接:https://www.haomeiwen.com/subject/hznlxxtx.html