美文网首页Flinkflinkflink
Flink源码阅读(五)--- checkpoint / sav

Flink源码阅读(五)--- checkpoint / sav

作者: sj_91d7 | 来源:发表于2021-03-02 19:03 被阅读0次

    Flink源码阅读(四)--- checkpoint制作这篇文章介绍了checkpoint制作原理,这篇文章在此基础上,介绍下怎么从checkpoint/savepoint恢复。本文内容是基于Flink 1.9来讲解。

    1. 概述

    作业从状态 checkpoint / savepoint 的情况简单总结主要是两种

    • 作业手动重启,从savepoint恢复
    • 作业运行过程中,某个task执行失败,从checkpoint恢复

    savepoint是一种人为主动触发生成的checkpoint,所以checkpoint/savepoint 恢复的原理是一样的。下面以工作中比较常见的某个task失败,作业如何恢复为例进行介绍。

    2. 状态分配

    首先说明下Task的状态state都有哪些,可以看ExecutionState.java类

        CREATED,
        
        SCHEDULED,
        
        DEPLOYING,
        
        RUNNING,
    
        /**
         * This state marks "successfully completed". It can only be reached when a
         * program reaches the "end of its input". The "end of input" can be reached
         * when consuming a bounded input (fix set of files, bounded query, etc) or
         * when stopping a program (not cancelling!) which make the input look like
         * it reached its end at a specific point.
         */
        FINISHED,
        
        CANCELING,
        
        CANCELED,
        
        FAILED,
    
        RECONCILING;
    

    Task各个state的转换关系如下:

     *     CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
     *        |            |            |          |
     *        |            |            |   +------+
     *        |            |            V   V
     *        |            |         CANCELLING -----+----> CANCELED
     *        |            |                         |
     *        |            +-------------------------+
     *        |
     *        |                                   ... -> FAILED
     *        V
     *    RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED
    

    Task进行state转换,是调用的

    Execution#transitionState --> vertex.notifyStateTransition --> getExecutionGraph().notifyExecutionChange
    

    如果task变成FAILED,就会调用

    failoverStrategy.onTaskFailure --> AdaptedRestartPipelinedRegionStrategyNG#onTaskFailure --> restartTasks --> resetAndRescheduleTasks --> createResetAndRescheduleTasksCallback
    

    这里restartTasks方法的参数是该Pipeline上所有需要restart的task。

    重点看下createResetAndRescheduleTasksCallback方法做了什么,看下源码

    LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size());
    
    // reset tasks to CREATED state and reload state
    resetTasks(unmodifiedVertices, globalModVersion);
    
    // re-schedule tasks
    rescheduleTasks(unmodifiedVertices, globalModVersion);
    

    做了两件事情,重置Tasks (状态分配) 和 重新调度Tasks,下面介绍下重置Tasks方法

    2.1 重置Tasks

    第一步:为每个节点重置Execution

            for (ExecutionVertex ev : vertices) {
                CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
                if (cgroup != null && !colGroups.contains(cgroup)){
                    cgroup.resetConstraints();
                    colGroups.add(cgroup);
                }
    
                ev.resetForNewExecution(restartTimestamp, globalModVersion);
            }
    

    第二步:把pendingCheckpoints这个map中所有正在做的checkpoint fail掉

    executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
                    new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
    

    第三步:从最近完成的checkpoint恢复state

    executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
                    involvedExecutionJobVertices, false, true);
    

    接下来重点看下第三步怎么从checkpoint恢复的?
    2.1.1 首先找到最近完成的一个latestCheckpoint
       如果latestCheckpoint==null
          如果 errorIfNoCheckpoint 开关为true,直接拋IllegalStateException
          如果 errorIfNoCheckpoint 开关为false,直接return
    2.1.2 给Tasks分配states,stateAssignmentOperation.assignStates(),主要做了下面几件事情:
       1. 对于checkpoint中所有的operatorStates,check在新tasks中是否都有对应的operatorID。如果在新tasks中缺少operatorStates中某一个operatorID,(i) allowNonRestoredState==true, 跳过该operatorID (ii) allowNonRestoredState==false, 拋IllegalStateException异常。

    checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks);
    

       2. 遍历所有的Tasks
    (1) 对于每个Task的所有operator:(i) 如果在checkpoint中存在对应的state,直接记录在operatorStates list中,(ii) 如果在checkpoint中没有对应的state,就为该operatorID初始化一个OperatorState,并记录在operatorStates list中。(iii) 对于并发度改变的缩扩容情况,对state进行重新分配,具体可以参考 state缩扩容
       最终每个Task分配的状态被封装在 JobManagerTaskRestore 中,然后通过 Execution.setInitialState() 关联到 Execution 中。JobManagerTaskRestore 会作为 TaskDeploymentDescriptor 的一个属性下发到 TaskExecutor 中。 缩扩容state重新分配简单总结如下:
       Operator State:state存储实现ListCheckpointed接口,这种实现的优点是可以对state根据并发方便重新分配。用户也可以重写restore state逻辑。
       Keyed State:Flink引入了Key Group的概念,将Key Group作为Keyed State的基本分配单元,如果并发度改变,就可以重新计算key group分配,然后分到不同的算子中。

       (iiii)补充一点,在对state重新分配的时候,会检查新提交tasks的Parallelism与上次operatorStates的MaxParallelism的关系,源码可参考 StateAssignmentOperation#checkParallelismPreconditions方法
       1. 如果 task的并发度 > checkpoint中operatorState的最大并发度, 就直接抛异常
       2. 如果 task的最大并发度 != operatorState的最大并发度
         2.1 如果 task的最大并发度没有自己配置,那把task的最大并发度就设置为operatorState的最大并发度
         2.2 如果自己配置了最大并发度,就直接抛异常

            if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
                throw new IllegalStateException(
                        "The state for task "
                                + executionJobVertex.getJobVertexId()
                                + " can not be restored. The maximum parallelism ("
                                + operatorState.getMaxParallelism()
                                + ") of the restored state is lower than the configured parallelism ("
                                + executionJobVertex.getParallelism()
                                + "). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism.");
            }
    
            // check that the number of key groups have not changed or if we need to override it to
            // satisfy the restored state
            if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
    
                if (!executionJobVertex.isMaxParallelismConfigured()) {
                    // if the max parallelism was not explicitly specified by the user, we derive it
                    // from the state
    
                    LOG.debug(
                            "Overriding maximum parallelism for JobVertex {} from {} to {}",
                            executionJobVertex.getJobVertexId(),
                            executionJobVertex.getMaxParallelism(),
                            operatorState.getMaxParallelism());
    
                    executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism());
                } else {
                    // if the max parallelism was explicitly specified, we complain on mismatch
                    throw new IllegalStateException(
                            "The maximum parallelism ("
                                    + operatorState.getMaxParallelism()
                                    + ") with which the latest "
                                    + "checkpoint of the execution job vertex "
                                    + executionJobVertex
                                    + " has been taken and the current maximum parallelism ("
                                    + executionJobVertex.getMaxParallelism()
                                    + ") changed. This "
                                    + "is currently not supported.");
                }
            }
    

    至于operatorState的最大并发度怎么计算的,等于存储operator对应的ExecutionJobVertex的最大并发度,ExecutionJobVertex的最大并发度可以参考ExecutionJobVertex类的构造方法
       1. 如果task设置了最大并发度,就按照设置的来
       2. 如果task没有设置最大并发度,就根据算子并发度来计算,可以参考 KeyGroupRangeAssignment#computeDefaultMaxParallelism方法,min(max(parallelism向上取整到2的最近幂, 2^7), 2^15)

        public static int computeDefaultMaxParallelism(int operatorParallelism) {
    
            checkParallelismPreconditions(operatorParallelism);
    
            return Math.min(
                    Math.max(
                            MathUtils.roundUpToPowerOfTwo(
                                    operatorParallelism + (operatorParallelism / 2)),
                            DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                    UPPER_BOUND_MAX_PARALLELISM);
        }
    

    至此,重置Tasks的逻辑大体就介绍完了。

    2.2 调度Tasks

    入口是AdaptedRestartPipelinedRegionStrategyNG#rescheduleTasks方法,真正开始执行调度的是SchedulingUtils.schedule方法。

    关于task调度的内容,可以看下我之前写的一篇文章 Flink作业提交(三)--- Job运行, 调度分为两步,申请slot和deploy task。

    在deploy task的时候,首先会调用StreamTask.invoke()方法,在invoke方法中,会对该Task中每个operator调用initializeState()方法,这里看下initializeState#initializeState源码

        private void initializeState() throws Exception {
    
            StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
    
            for (StreamOperator<?> operator : allOperators) {
                if (null != operator) {
                    operator.initializeState();
                }
            }
        }
    

    然后会调用AbstractStreamOperator#initializeState方法

        @Override
        public final void initializeState() throws Exception {
    
            final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
    
            final StreamTask<?, ?> containingTask =
                Preconditions.checkNotNull(getContainingTask());
            final CloseableRegistry streamTaskCloseableRegistry =
                Preconditions.checkNotNull(containingTask.getCancelables());
            final StreamTaskStateInitializer streamTaskStateManager =
                Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
    
            final StreamOperatorStateContext context =
                streamTaskStateManager.streamOperatorStateContext(
                    getOperatorID(),
                    getClass().getSimpleName(),
                    this,
                    keySerializer,
                    streamTaskCloseableRegistry,
                    metrics);
    
            this.operatorStateBackend = context.operatorStateBackend();
            this.keyedStateBackend = context.keyedStateBackend();
    
            if (keyedStateBackend != null) {
                this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
            }
    
            timeServiceManager = context.internalTimerServiceManager();
    
            CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
            CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
    
            try {
                StateInitializationContext initializationContext = new StateInitializationContextImpl(
                    context.isRestored(), // information whether we restore or start for the first time
                    operatorStateBackend, // access to operator state backend
                    keyedStateStore, // access to keyed state backend
                    keyedStateInputs, // access to keyed state stream
                    operatorStateInputs); // access to operator state stream
    
                initializeState(initializationContext);
            } finally {
                closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
                closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
            }
        }
    
    • 上面提到 TaskExecutor 使用 TaskStateManager 来管理当前 Task 的状态,TaskStateManager 对象会基于分配的 JobManagerTaskRestore 和本地状态存储 TaskLocalStateStore 进行创建。

    • 状态初始化的关键方法在于通过 StreamTaskStateInitializer.streamOperatorStateContext() 生成 StreamOperatorStateContext,通过 StreamOperatorStateContext 可以获取 operatorStateBackend,Raw State Streams,operatorStateBackend以及timeServiceManager等,然后就可以进行状态恢复了。

    • 咱们接着看下StreamOperatorStateContext是怎么生成的,具体实现可以看下 StreamTaskStateInitializerImpl#streamOperatorStateContext方法

            TaskInfo taskInfo = environment.getTaskInfo();
            OperatorSubtaskDescriptionText operatorSubtaskDescription =
                new OperatorSubtaskDescriptionText(
                    operatorID,
                    operatorClassName,
                    taskInfo.getIndexOfThisSubtask(),
                    taskInfo.getNumberOfParallelSubtasks());
    
            final String operatorIdentifierText = operatorSubtaskDescription.toString();
    
            final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
                taskStateManager.prioritizedOperatorState(operatorID);
    
            AbstractKeyedStateBackend<?> keyedStatedBackend = null;
            OperatorStateBackend operatorStateBackend = null;
            CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
            CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
            InternalTimeServiceManager<?> timeServiceManager;
    
            try {
    
                // -------------- Keyed State Backend --------------
                keyedStatedBackend = keyedStatedBackend(
                    keySerializer,
                    operatorIdentifierText,
                    prioritizedOperatorSubtaskStates,
                    streamTaskCloseableRegistry,
                    metricGroup);
    
                // -------------- Operator State Backend --------------
                operatorStateBackend = operatorStateBackend(
                    operatorIdentifierText,
                    prioritizedOperatorSubtaskStates,
                    streamTaskCloseableRegistry);
    
                // -------------- Raw State Streams --------------
                rawKeyedStateInputs = rawKeyedStateInputs(
                    prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
                streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
    
                rawOperatorStateInputs = rawOperatorStateInputs(
                    prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
                streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
    
                // -------------- Internal Timer Service Manager --------------
                timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
    
                // -------------- Preparing return value --------------
    
                return new StreamOperatorStateContextImpl(
                    prioritizedOperatorSubtaskStates.isRestored(),
                    operatorStateBackend,
                    keyedStatedBackend,
                    timeServiceManager,
                    rawOperatorStateInputs,
                    rawKeyedStateInputs);
    
    • 为了生成 StreamOperatorStateContext
         1. 通过 TaskStateManager.prioritizedOperatorState() 方法获得每个 Operator 需要恢复的状态句柄。
         2. 使用获得的状态句柄创建并还原 state backend 和 timer。这里引入了 PrioritizedOperatorSubtaskState,它封装了多个备选的 OperatorSubtaskState快照,这些快照相互之间是可以(部分)替换的,并按照优先级排序。

    小结

    本篇文章介绍了当作业某些Task fail之后,Task状态如何分配,以及调度Task怎么使用state进行恢复。

    相关文章

      网友评论

        本文标题:Flink源码阅读(五)--- checkpoint / sav

        本文链接:https://www.haomeiwen.com/subject/xbsdfltx.html