美文网首页Flink源码解析
当 snapshot 失败时发生了什么

当 snapshot 失败时发生了什么

作者: shengjk1 | 来源:发表于2020-10-10 11:59 被阅读0次

    工作中遇到了与 snapshot 异常相关的问题,特此总结一下,与 snapshot 相关的流程图如下:


    在这里插入图片描述

    当调用 AbstractUdfStreamOperator.snapshotState 方法时,实际上调用了

    public static void snapshotFunctionState(
                StateSnapshotContext context,
                OperatorStateBackend backend,
                Function userFunction) throws Exception {
    
            Preconditions.checkNotNull(context);
            Preconditions.checkNotNull(backend);
    
            while (true) {
    
                if (trySnapshotFunctionState(context, backend, userFunction)) {
                    break;
                }
    
                // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
                if (userFunction instanceof WrappingFunction) {
                    userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
                } else {
                    break;
                }
            }
        }
    
        private static boolean trySnapshotFunctionState(
                StateSnapshotContext context,
                OperatorStateBackend backend,
                Function userFunction) throws Exception {
    
            // 调用 checkpoint function 的 snapshotState 方法
            if (userFunction instanceof CheckpointedFunction) {
                ((CheckpointedFunction) userFunction).snapshotState(context);
    
                return true;
            }
    ......
    

    当用户定义的 snapshotState 方法向外抛异常时,异常会一直上抛至 Task.triggerCheckpointBarrier 方法

    public void triggerCheckpointBarrier(
            final long checkpointID,
            long checkpointTimestamp,
            final CheckpointOptions checkpointOptions) {
            
            //实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
            // source ->flatMap
            // invokable 实际上是 operator chain
            final AbstractInvokable invokable = this.invokable;
            final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
            
            if (executionState == ExecutionState.RUNNING && invokable != null) {
                
                // build a local closure
                final String taskName = taskNameWithSubtask;
                final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
                    FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
                
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        // set safety net from the task's context for checkpointing thread
                        LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
                        
                        try {
                            // invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
                            // only 做 checkpoint 的异常
                            // 当 checkpoint 发生异常时,ExecutionState 会转化为 FAILED 会导致重启
                            boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                            if (!success) {
                                checkpointResponder.declineCheckpoint(
                                    getJobID(), getExecutionId(), checkpointID,
                                    new CheckpointDeclineTaskNotReadyException(taskName));
                            }
                        } catch (Throwable t) {
                            if (getExecutionState() == ExecutionState.RUNNING) {
                                failExternally(new Exception(
                                    "Error while triggering checkpoint " + checkpointID + " for " +
                                        taskNameWithSubtask, t));
                            } else {
                                LOG.debug("Encountered error while triggering checkpoint {} for " +
                                        "{} ({}) while being not in state running.", checkpointID,
                                    taskNameWithSubtask, executionId, t);
                            }
                        } finally {
                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                        }
                    }
                };
                executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
            } else {
                LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
                
                // send back a message that we did not do the checkpoint
                checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
            }
        }
    

    其中关键性的方法实际上是

    if (getExecutionState() == ExecutionState.RUNNING) {
                                failExternally(new Exception(
                                    "Error while triggering checkpoint " + checkpointID + " for " +
                                        taskNameWithSubtask, t));
                            } else {
                                LOG.debug("Encountered error while triggering checkpoint {} for " +
                                        "{} ({}) while being not in state running.", checkpointID,
                                    taskNameWithSubtask, executionId, t);
                            }
    

    而此方法调用了

    cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
    

    查看细节

    private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
            while (true) {
                ExecutionState current = executionState;
                
                // if the task is already canceled (or canceling) or finished or failed,
                // then we need not do anything
                if (current.isTerminal() || current == ExecutionState.CANCELING) {
                    LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
                    return;
                }
                
                if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
                    if (transitionState(current, targetState, cause)) {
                        // if we manage this state transition, then the invokable gets never called
                        // we need not call cancel on it
                        this.failureCause = cause;
                        return;
                    }
                } else if (current == ExecutionState.RUNNING) {
                    if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
                        // we are canceling / failing out of the running state
                        // we need to cancel the invokable
                        
                        // copy reference to guard against concurrent null-ing out the reference
                        final AbstractInvokable invokable = this.invokable;
                        
                        if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
                            this.failureCause = cause;
                            
                            LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
                            
                            // because the canceling may block on user code, we cancel from a separate thread
                            // we do not reuse the async call handler, because that one may be blocked, in which
                            // case the canceling could not continue
                            
                            // The canceller calls cancel and interrupts the executing thread once
                            Runnable canceler = new TaskCanceler(
                                LOG,
                                invokable,
                                executingThread,
                                taskNameWithSubtask,
                                producedPartitions,
                                inputGates);
                            
                            Thread cancelThread = new Thread(
                                executingThread.getThreadGroup(),
                                canceler,
                                String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
                            cancelThread.setDaemon(true);
                            cancelThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
                            cancelThread.start();
                            
                            // the periodic interrupting thread - a different thread than the canceller, in case
                            // the application code does blocking stuff in its cancellation paths.
                            if (invokable.shouldInterruptOnCancel()) {
                                Runnable interrupter = new TaskInterrupter(
                                    LOG,
                                    invokable,
                                    executingThread,
                                    taskNameWithSubtask,
                                    taskCancellationInterval);
                                
                                Thread interruptingThread = new Thread(
                                    executingThread.getThreadGroup(),
                                    interrupter,
                                    String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
                                interruptingThread.setDaemon(true);
                                interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
                                interruptingThread.start();
                            }
                            
                            // if a cancellation timeout is set, the watchdog thread kills the process
                            // if graceful cancellation does not succeed
                            if (taskCancellationTimeout > 0) {
                                Runnable cancelWatchdog = new TaskCancelerWatchDog(
                                    executingThread,
                                    taskManagerActions,
                                    taskCancellationTimeout,
                                    LOG);
                                
                                Thread watchDogThread = new Thread(
                                    executingThread.getThreadGroup(),
                                    cancelWatchdog,
                                    String.format("Cancellation Watchdog for %s (%s).",
                                        taskNameWithSubtask, executionId));
                                watchDogThread.setDaemon(true);
                                watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
                                watchDogThread.start();
                            }
                        }
                        return;
                    }
                } else {
                    throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
                        current, taskNameWithSubtask, executionId));
                }
            }
        }
    

    主要就是将 ExecutionState 转化为 FAILED,然后进行一系列的取消操作。由于 ExecutionState 转为 FAILED,会触发 flink 的重启机制,若无重启机制,则直接失败。

    相关文章

      网友评论

        本文标题:当 snapshot 失败时发生了什么

        本文链接:https://www.haomeiwen.com/subject/kzwdpktx.html