美文网首页Flink
深入理解Flink中的状态实现

深入理解Flink中的状态实现

作者: 一护_______ | 来源:发表于2019-01-10 22:50 被阅读0次

    本文是整理自几个月前的内部flink state分享,flink状态所包含的东西很多,在下面列举了一些,还有一些在本文没有体现,后续会单独的挑出来再进行讲解

    • state的层次结构
    • keyedState => windowState
    • OperatorState => kafkaOffset
    • stateBackend
    • snapshot/restore
    • internalTimerService
    • RocksDB操作的初探
    • state ttL
    • state local recovery
    • QueryableState
    • increamental checkpoint
    • state redistribution
    • broadcasting state
    • CheckpointStreamFactory

    内部和外部状态

    flink状态分为了内部和外部使用接口,但是两个层级都是一一对应,内部接口都实现了外部接口,主要是有两个目的

    • 内部接口提供了更多的方法,包括获取state中的serialize之后的byte,以及Namespace的操作方法。内部状态主要用于内部runtime实现时所需要用到的一些状态比如window中的windowState,CEP中的sharedBuffer,kafkaConsumer中offset管理的ListState,而外部State接口主要是用户自定义使用的一些状态
    • 考虑到各个版本的兼容性,外部接口要保障跨版本之间的兼容问题,而内部接口就很少受到这个限制,因此也就比较灵活

    状态的使用

    了解了flink 状态的层次结构,那么编程中和flink内部是如何使用这些状态呢?

    flink中使用状态主要是两部分,一部分是函数中使用状态,另一部分是在operator中使用状态

    方式:

    • CheckpointedFunction
    • ListCheckpointed
    • RuntimeContext (DefaultKeyedStateStore)
    • StateContext

    StateContext

    StateInitializationContext

    Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs();
    
    Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs();
    

    ManagedInitializationContext

    OperatorStateStore getOperatorStateStore();
    KeyedStateStore getKeyedStateStore();
    
    

    举例:

    1. AbstractStreamOperator封装了这个方法initializeState(StateInitializationContext context)用以在operator中进行raw和managed的状态管理

    2. CheckpointedFunction的用法其实也是借助于StateContext进行相关实现

    CheckpointedFunction#initializeState方法在transformation function的各个并发实例初始化的时候被调用这个方法提供了FunctionInitializationContext的对象,可以通过这个context来获取OperatorStateStore或者KeyedStateStore,也就是说通过这个接口可以注册这两种类型的State,这也是和ListCheckpointed接口不一样的地方,只是说KeyedStateStore只能在keyedstream上才能注册,否则就会报错而已,以下是一个使用这两种类型状态的样例。 可以参见FlinkKafkaConsumerBase通过这个接口来实现offset的管理。

    public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
    
         private ReducingState<Long> countPerKey;
         private ListState<Long> countPerPartition;
    
         private long localCount;
    
         public void initializeState(FunctionInitializationContext context) throws Exception {
             // get the state data structure for the per-key state
             countPerKey = context.getKeyedStateStore().getReducingState(
                     new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
    
             // get the state data structure for the per-partition state
             countPerPartition = context.getOperatorStateStore().getOperatorState(
                     new ListStateDescriptor<>("perPartitionCount", Long.class));
    
             // initialize the "local count variable" based on the operator state
             for (Long l : countPerPartition.get()) {
                 localCount += l;
             }
         }
    
         public void snapshotState(FunctionSnapshotContext context) throws Exception {
             // the keyed state is always up to date anyways
             // just bring the per-partition state in shape
             countPerPartition.clear();
             countPerPartition.add(localCount);
         }
    
         public T map(T value) throws Exception {
             // update the states
             countPerKey.add(1L);
             localCount++;
    
             return value;
         }
     }
     }
    

    这个Context的继承接口StateSnapshotContext的方法则提供了raw state的存储方法,但是其实没有对用户函数提供相应的接口,只是在引擎中有相关的使用,相比较而言这个接口提供的方法,context比较多,也有一些简单的方法去注册使用operatorstate 和 keyedState。如通过RuntimeContext注册keyedState:

    因此使用简易化程度为:

    RuntimeContext > FunctionInitializationContext > StateSnapshotContext

    keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() {
    
         private ListState<MyType> state;
    
         public void open(Configuration cfg) {
             state = getRuntimeContext().getListState(
                     new ListStateDescriptor<>("myState", MyType.class));
         }
    
         public void flatMap(MyType value, Collector<MyType> out) {
             if (value.isDivider()) {
                 for (MyType t : state.get()) {
                     out.collect(t);
                 }
             } else {
                 state.add(value);
             }
         }
     });
    

    通过实现ListCheckpointed来注册OperatorState,但是这个有限制:
    一个function只能注册一个state,因为并不能像其他接口一样指定state的名字.

    example:

    public class CountingFunction<T> implements MapFunction<T, Tuple2<T, Long>>, ListCheckpointed<Long> {
    
         // this count is the number of elements in the parallel subtask
         private long count;
    
         {@literal @}Override
         public List<Long> snapshotState(long checkpointId, long timestamp) {
             // return a single element - our count
             return Collections.singletonList(count);
         }
    
         {@literal @}Override
         public void restoreState(List<Long> state) throws Exception {
             // in case of scale in, this adds up counters from different original subtasks
             // in case of scale out, list this may be empty
             for (Long l : state) {
                 count += l;
             }
         }
    
         {@literal @}Override
         public Tuple2<T, Long> map(T value) {
             count++;
             return new Tuple2<>(value, count);
         }
     }
     }
    

    下面比较一下里面的两种stateStore

    • KeyedStateStore
    • OperatorStateStore

    查看OperatorStateStore接口可以看到OperatorState只提供了ListState一种形式的状态接口,OperatorState和KeyedState主要有以下几个区别:

    • keyedState只能应用于KeyedStream,而operatorState都可以
    • keyedState可以理解成一个算子为每个subtask的每个key维护了一个状态namespace,而OperatorState是每个subtask共享一个状态
    • operatorState只提供了ListState,而keyedState提供了ValueState,ListState,ReducingState,MapState
    • operatorStateStore的默认实现只有DefaultOperatorStateBackend可以看到他的状态都是存储在堆内存之中,而keyedState根据backend配置的不同,线上都是存储在rocksdb之中

    snapshot

    这个让我们着眼于两个Operator的snapshot,AbstractStreamOperatorAbstractUdfStreamOperator,这两个基类几乎涵盖了所有相关operator和function在做snapshot的时候会做的处理。

    if (null != operatorStateBackend) {
                    snapshotInProgress.setOperatorStateManagedFuture(
                        operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
    
                if (null != keyedStateBackend) {
                    snapshotInProgress.setKeyedStateManagedFuture(
                        keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
    }
    
    1. 按keyGroup去snapshot各个timerService的状态,包括processingTimer和eventTimer(RawKeyedOperatorState)
    2. 将operatorStateBackend和keyedStateBackend中的状态做snapshot
    3. 如果Operator还包含了userFunction,即是一个UdfStreamOperator,那么可以注意到udfStreamOperator覆写了父类的snapshotState(StateSnapshotContext context)方法,其主要目的就是为了将Function中的状态及时的register到相应的backend中,在第二步的时候统一由CheckpointStreamFactory去做快照

    StreamingFunctionUtils#snapshotFunctionState

    if (userFunction instanceof CheckpointedFunction) {
                ((CheckpointedFunction) userFunction).snapshotState(context);
    
                return true;
            }
    
            if (userFunction instanceof ListCheckpointed) {
                @SuppressWarnings("unchecked")
                List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                    snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
    
                ListState<Serializable> listState = backend.
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
    
                listState.clear();
    
                if (null != partitionableState) {
                    try {
                        for (Serializable statePartition : partitionableState) {
                            listState.add(statePartition);
                        }
                    } catch (Exception e) {
                        listState.clear();
    
                        throw new Exception("Could not write partitionable state to operator " +
                            "state backend.", e);
                    }
                }
    

    可以看到这里就只有以上分析的两种类型的checkpoined接口,CheckpointedFunction,只需要执行相应的snapshot方法,相应的函数就已经将要做snapshot的数据打入了相应的state中,而ListCheckpointed接口由于返回的是个List,所以需要手动的通过getSerializableListState注册一个ListState(这也是ListCheckpointed只能注册一个state的原因),然后将List数据挨个存入ListState中。

    operatorStateBackend#snapshot

    1. 针对所有注册的state作deepCopy,为了防止在checkpoint的时候数据结构又被修改,deepcopy其实是通过序列化和反序列化的过程(参见http://aitozi.com/java-serialization.html
    2. 异步将state以及metainfo的数据写入到hdfs中,使用的是flink的asyncIO(这个也可以后续深入了解下),并返回相应的statehandle用作restore的过程
    3. 在StreamTask触发checkpoint的时候会将一个Task中所有的operator触发一次snapshot,触发部分就是上面1,2两个步骤,其中第二步是会返回一个RunnableFuture,在触发之后会提交一个AsyncCheckpointRunnable异步任务,会阻塞一直等到checkpoint的Future,其实就是去调用这个方法AbstractAsyncIOCallable, 直到完成之后OperatorState会返回一个OperatorStateHandle,这个地方和后文的keyedState返回的handle不一样。
    @Override
        public V call() throws Exception {
    
            synchronized (this) {
                if (isStopped()) {
                    throw new IOException("Task was already stopped. No I/O handle opened.");
                }
    
                ioHandle = openIOHandle();
            }
    
            try {
    
                return performOperation();
    
            } finally {
                closeIOHandle();
            }
    

    在managed keyedState、managed operatorState、raw keyedState、和raw operatorState都完成返回相应的Handle之后,会生成一个SubTaskState来ack jobmanager,这个主要是用在restore的过程中

    SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles(
                        chainedNonPartitionedOperatorsState,
                        chainedOperatorStateBackend,
                        chainedOperatorStateStream,
                        keyedStateHandleBackend,
                        keyedStateHandleStream);
                        
    owner.getEnvironment().acknowledgeCheckpoint(
        checkpointMetaData.getCheckpointId(),
        checkpointMetrics,
        subtaskState);
    

    在jm端,ack的时候又将各个handle封装在pendingCheckpoint => operatorStates => operatorState => operatorSubtaskState中,最后无论是savepoint或者是externalCheckpoint都会将相应的handle序列化存储到hdfs,这也就是所谓的checkpoint元数据。这个可以起个任务观察下zk和hdfs上的文件,补充一下相关的验证。

    至此完成operator state的snapshot/checkpoint阶段

    KeyedStateBackend#snapshot

    和operatorStateBackend一样,snapshot也分为了同步和异步两个部分。

    1. rocksDB的keyedStateBackend的snapshot提供了增量和全量两种方式
    2. 利用rocksdb自身的snapshot进行this.snapshot = stateBackend.db.getSnapshot(); 这个过程是同步的,rocksdb这块是怎么snapshot还不是很了解,待后续学习
    3. 之后也是一样异步将数据写入hdfs,返回相应的keyGroupsStateHandle snapshotOperation.closeCheckpointStream();

    不同的地方在于增量返回的是IncrementalKeyedStateHandle,而全量返回的是KeyGroupsStateHandle

    restore / redistribution

    OperatorState的rescale

    void setInitialState(TaskStateHandles taskStateHandles) throws Exception;
    

    一个task在真正的执行任务之前所需要做的事情是把状态inject到task中,如果一个任务是失败之后从上次的checkpoint点恢复的话,他的状态就是非空的。streamTask也就靠是否有这样的一个恢复状态来确认算子是不是在restore来branch他的启动逻辑

    if (null != taskStateHandles) {
            if (invokable instanceof StatefulTask) {
                StatefulTask op = (StatefulTask) invokable;
                op.setInitialState(taskStateHandles);
            } else {
                throw new IllegalStateException("Found operator state for a non-stateful task invokable");
            }
            // be memory and GC friendly - since the code stays in invoke() for a potentially long time,
            // we clear the reference to the state handle
            //noinspection UnusedAssignment
            taskStateHandles = null;
    }
    

    那么追根究底一下这个Handle是怎么带入的呢?

    FixedDelayRestartStrategy => triggerFullRecovery => Execution#restart => Execution#scheduleForExecution => Execution#deployToSlot => ExecutionVertex => TaskDeploymentDescriptor => taskmanger => task

    当然还有另一个途径就是通过向jobmanager submitJob的时候带入restore的checkpoint path, 这两种方式最终都会通过checkpointCoordinator#restoreLatestCheckpointedState来恢复hdfs中的状态来获取到snapshot时候存入的StateHandle。

    恢复的过程如何进行redistribution呢? 也就是大家关心的并发度变了我的状态的行为是怎么样的。

    // re-assign the task states
    final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
    
    StateAssignmentOperation stateAssignmentOperation =
            new StateAssignmentOperation(tasks, operatorStates, allowNonRestoredState);
    
    stateAssignmentOperation.assignStates();
    
    1. 如果并发度没变那么不做重新的assign,除非state的模式是broadcast,会将一个task的state广播给所有的task
    2. 对于operator state会针对每一个name的state计算出每个subtask中的element个数之和(这就要求每个element之间相互独立)进行roundrobin分配
    3. keyedState的重新分配相对简单,就是根据新的并发度和最大并发度计算新的keygroupRange,然后根据subtaskIndex获取keyGroupRange,然后获取到相应的keyStateHandle完成状态的切分。

    这里补充关于raw state和managed state在rescale上的差别,由于operator state在reassign的时候是根据metaInfo来计算出所有的List<element>来重新分配,operatorbackend中注册的状态是会保存相应的metainfo,最终也会在snapshot的时候存入OperatorHandle,那raw state的metainfo是在哪里呢?

    其实会在写入hdfs返回相应的handle的时候构建一个默认的,OperatorStateCheckpointOutputStream#closeAndGetHandle,其中状态各个partition的构建来自startNewPartition方法,引擎中我所看到的rawstate仅有timerservice的raw keyedState

    OperatorStateHandle closeAndGetHandle() throws IOException {
            StreamStateHandle streamStateHandle = delegate.closeAndGetHandle();
    
            if (null == streamStateHandle) {
                return null;
            }
    
            if (partitionOffsets.isEmpty() && delegate.getPos() > initialPosition) {
                startNewPartition();
            }
    
            Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(1);
    
            OperatorStateHandle.StateMetaInfo metaInfo =
                    new OperatorStateHandle.StateMetaInfo(
                            partitionOffsets.toArray(),
                            OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    
            offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, metaInfo);
    
            return new OperatorStateHandle(offsetsMap, streamStateHandle);
        }
    

    KeyedState的keyGroup

    keyedState重新分配里引入了一个keyGroup的概念,那么这里为什么要引入keygroup这个概念呢?

    1. hash(key) = key(identity)
    2. key_group(key) = hash(key) % number_of_key_groups (等于最大并发),默认flink任务会设置一个max parallel
    3. subtask(key) = key_greoup(key) * parallel / number_of_key_groups
    • 避免在恢复的时候带来随机IO
    • 避免每个subtask需要将所有的状态数据读取出来pick和自己subtask相关的浪费了很多io资源
    • 减少元数据的量,不再需要保存每次的key,每一个keygroup组只需保留一个range
    int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
            int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
            return new KeyGroupRange(start, end);
    
    • 每一个backend(subtask)上只有一个keygroup range
    • 每一个subtask在restore的时候就接收到了已经分配好的和重启后当前这个并发相绑定的keyStateHandle
    subManagedKeyedState = getManagedKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
                subRawKeyedState = getRawKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
    

    这里面关键的一步在于,根据新的subtask上的keyGroupRange,从原来的operator的keyGroupsStateHandle中求取本subtask所关心的一部分Handle,可以看到每个KeyGroupsStateHandle都维护了KeyGroupRangeOffsets这样一个变量,来标记这个handle所覆盖的keygrouprange,以及keygrouprange在stream中offset的位置,可以看下再snapshot的时候会记录offset到这个对象中来

    keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
    
    public KeyGroupRangeOffsets getIntersection(KeyGroupRange keyGroupRange) {
            Preconditions.checkNotNull(keyGroupRange);
            KeyGroupRange intersection = this.keyGroupRange.getIntersection(keyGroupRange);
            long[] subOffsets = new long[intersection.getNumberOfKeyGroups()];
            if(subOffsets.length > 0) {
                System.arraycopy(
                        offsets,
                        computeKeyGroupIndex(intersection.getStartKeyGroup()),
                        subOffsets,
                        0,
                        subOffsets.length);
            }
            return new KeyGroupRangeOffsets(intersection, subOffsets);
        }
    

    KeyGroupsStateHandle是一个subtask的所有state的一个handle
    KeyGroupsStateHandle维护一个KeyGroupRangeOffsets,
    KeyGroupRangeOffsets维护一个KeyGroupRange和offsets
    KeyGroupRange维护多个KeyGroup
    KeyGroup维护多个key

    KeyGroupsStateHandle和operatorStateHandle还有一个不同点,operatorStateHandle维护了metainfo中的offset信息用在restore时的reassign,原因在于KeyGroupsStateHandle的reassign不依赖这些信息,当然在restore的时候也需要keygroupOffset中的offset信息来重新构建keyGroupsStateHandle来进行各个task的状态分配。

    参考:

    https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

    http://chenyuzhao.me/2017/12/24/Flink-%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%E7%9A%84%E8%AE%BE%E8%AE%A1-%E5%AD%98%E5%82%A8/

    相关文章

      网友评论

        本文标题:深入理解Flink中的状态实现

        本文链接:https://www.haomeiwen.com/subject/iigzrqtx.html