Flink 源码之快照

作者: AlienPaul | 来源:发表于2020-01-15 16:58 被阅读0次

Flink 系列博客

Flink QuickStart
Flink双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理
Flink 源码之节点间通信
Flink 源码之Credit Based反压

周期触发checkpoint的方法调用链

  • JobMaster.triggerSavepoint
  • SchedulerBase.startCheckpointScheduler
  • CheckpointCoordinator.startCheckpointScheduler
  • CheckpointCoordinator.scheduleTriggerWithDelay
  • CheckpointCoordinator.triggerCheckpoint
  • Execution.triggerCheckpoint
  • Execution.triggerCheckpointHelper
  • RpcTaskManagerGateway.triggerCheckpoint
  • TaskExecutor.triggerCheckpoint
  • Task.triggerCheckpointBarrier
  • invokable.triggerCheckpointAsync(这里我们看StreamTask)
  • StreamTask.performCheckpoint
  • OperatorChain.broadcastCheckpointBarrier和CheckpointingOperation.executeCheckpointing

设置触发checkpoint的节点

ExecutionGraphenableCheckpointing方法,创建了一个checkpointCoordinator对象。该对象运行如JobManager中,负责统筹这个分布式系统中的checkpoint过程。它负责如下内容:

  • 定时触发checkpoint操作。命令数据源发送checkpoint屏障。
  • 接收各个operator的某个checkpoint完成确认消息。
  • 对于某个checkpoint,当接收到所有operator的确认消息之时,发送消息通知各个operator,checkpoint已完成。
  • 保存已完成和正在进行中的checkpoint的相关信息。

我们注意到构建checkpointCoordinator传入一个变量叫tasksToTrigger。含义为需要触发checkpoint的节点。这个变量在StreamingJobGraphGeneratorconfigureCheckpointing方法中创建。此方法的相关代码如下(无关部分已省略):

for (JobVertex vertex : jobVertices.values()) {
    if (vertex.isInputVertex()) {
        triggerVertices.add(vertex.getID());
    }
    commitVertices.add(vertex.getID());
    ackVertices.add(vertex.getID());
}

triggerVertices集合为符合isInputVertex这个条件的所有vertex。继续查看isInputVertex方法。代码如下:

public boolean isInputVertex() {
    return this.inputs.isEmpty();
}

这下就明白了。没有任何输入的JobVertex才是inputVertex。因此,Checkpoint操作只会在inputVertex触发,即数据源是首先触发checkpoint操作的节点,然后checkpoint随着checkpoint barrier流向下游,依次触发各个节点的checkpoint操作。

周期触发Checkpoint操作的调用链分析

JobMaster

JobMaster触发savepoint的时候会启动checkpoint过程。
我们查看下JobMastertriggerSavepoint方法:

@Override
public CompletableFuture<String> triggerSavepoint(
        @Nullable final String targetDirectory,
        final boolean cancelJob,
        final Time timeout) {

    return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
}

下面我们分析下schedulerNG.triggerSavepoint方法。

SchedulerBase

JobMastertriggerSavepoint里的schedulerNG.triggerSavepoint调用的是SchedulerBase的方法。代码如下:

@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
    // 确保运行在主线程
    mainThreadExecutor.assertRunningInMainThread();

    // 从executionGraph获取checkpointCoordinator
    final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    if (checkpointCoordinator == null) {
        throw new IllegalStateException(
            String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
    } else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
        // 确保配置了savepoint默认存储目录,或者方法中传入了存储目录
        log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

        throw new IllegalStateException(
            "No savepoint directory configured. You can either specify a directory " +
                "while cancelling via -s :targetDirectory or configure a cluster-wide " +
                "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
    }

    // 如果是取消作业,停止checkpoint协调器
    if (cancelJob) {
        checkpointCoordinator.stopCheckpointScheduler();
    }

    // 先触发一次savepoint操作(实际上触发的是checkpoint)
    // 接下来返回checkpoint操作保存的文件路径
    // 最后执行:1.如果需要取消作业,并且之前步骤抛出了异常,则再次启动checkpoint协调器,抛出异常
    // 2.如果需要取消作业,之前步骤没有抛出异常,取消任务执行
    return checkpointCoordinator
        .triggerSavepoint(System.currentTimeMillis(), targetDirectory)
        .thenApply(CompletedCheckpoint::getExternalPointer)
        .handleAsync((path, throwable) -> {
            if (throwable != null) {
                if (cancelJob) {
                    startCheckpointScheduler(checkpointCoordinator);
                }
                throw new CompletionException(throwable);
            } else if (cancelJob) {
                log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
                cancel();
            }
            return path;
        }, mainThreadExecutor);
}

CheckpointCoordinator

CheckpointCoordinator负责协调所有算子的分布式快照和状态。它向相关的
task发送消息来触发快照动作,之后收集它们快照成功的确认消息。

CheckpointCoordinatorcreateActivatorDeactivator方法。该方法创建了一个Job状态监听器。如果Job的运行状态发生变化会调用listener的jobStatusChanges方法。代码如下:

CheckpointCoordinatorDeActivatorjobStatusChanges方法:

@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
    if (newJobStatus == JobStatus.RUNNING) {
        // start the checkpoint scheduler
        coordinator.startCheckpointScheduler();
    } else {
        // anything else should stop the trigger for now
        coordinator.stopCheckpointScheduler();
    }
}

从代码中不难发现,如果Job跳转为RUNNING状态,调用CheckpointCoordinatorstartCheckpointScheduler方法。反之调用stopCheckpointScheduler方法。

接下来看一看startCheckpointScheduler方法的源代码,如下所示:

public void startCheckpointScheduler() {
    synchronized (lock) {
        if (shutdown) {
            throw new IllegalArgumentException("Checkpoint coordinator is shut down");
        }

        // make sure all prior timers are cancelled
        // 先停止之前创建的scheduler
        stopCheckpointScheduler();

        // 再创建一个新的scheduler
        periodicScheduling = true;
        // 延迟一段时间后启动定时checkpoint触发任务
        // 延迟时间为checkpoint间隔最短时间到checkpoint间隔时间+1(开区间)之间的随机值
        currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
    }
}

scheduleTriggerWithDelay方法启动了一个checkpoint操作定时触发器,代码如下所示:

private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
    return timer.scheduleAtFixedRate(
        new ScheduledTrigger(),
        initDelay, baseInterval, TimeUnit.MILLISECONDS);
}

这段代码设置了一个定时触发任务,任务逻辑在ScheduledTrigger中。它的代码如下:

private final class ScheduledTrigger implements Runnable {

    @Override
    public void run() {
        try {
            triggerCheckpoint(System.currentTimeMillis(), true);
        }
        catch (Exception e) {
            LOG.error("Exception while triggering checkpoint for job {}.", job, e);
        }
    }
}
``

`run`方法内仅有一个调用`triggerCheckpoint`。我们跟踪这个方法:
```java
// 上一步调用该方法时timestamp为系统当前时间,isPeriodic为true
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, boolean isPeriodic) {
    try {
        return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false);
    } catch (CheckpointException e) {
        long latestGeneratedCheckpointId = getCheckpointIdCounter().get();
        // here we can not get the failed pending checkpoint's id,
        // so we pass the negative latest generated checkpoint id as a special flag
        failureManager.handleJobLevelCheckpointException(e, -1 * latestGeneratedCheckpointId);
        return FutureUtils.completedExceptionally(e);
    }
}

接下来会调用真正的checkpoint处理逻辑,该方法比较长。它的代码如下:

@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
        long timestamp,
        CheckpointProperties props,
        @Nullable String externalSavepointLocation,
        boolean isPeriodic,
        boolean advanceToEndOfTime) throws CheckpointException {

    if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    // make some eager pre-checks
    // 参数检查
    synchronized (lock) {
        preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
    }

    // check if all tasks that we need to trigger are running.
    // if not, abort the checkpoint
    // tasksToTrigger为需要触发checkpoint的task,本篇一开始已分析过
    // 确保所有的tasksToTrigger都在运行状态
    Execution[] executions = new Execution[tasksToTrigger.length];
    for (int i = 0; i < tasksToTrigger.length; i++) {
        Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
        if (ee == null) {
            LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                    tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                    job);
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        } else if (ee.getState() == ExecutionState.RUNNING) {
            executions[i] = ee;
        } else {
            LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                    tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                    job,
                    ExecutionState.RUNNING,
                    ee.getState());
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        }
    }

    // next, check if all tasks that need to acknowledge the checkpoint are running.
    // if not, abort the checkpoint
    // 检查所有需要接收checkpoint确认消息的task是否在运行
    Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

    for (ExecutionVertex ev : tasksToWaitFor) {
        Execution ee = ev.getCurrentExecutionAttempt();
        if (ee != null) {
            ackTasks.put(ee.getAttemptId(), ev);
        } else {
            LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                    ev.getTaskNameWithSubtaskIndex(),
                    job);
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        }
    }

    // we will actually trigger this checkpoint!
    // 开始checkpoint过程
    final CheckpointStorageLocation checkpointStorageLocation;
    final long checkpointID;

    try {
        // this must happen outside the coordinator-wide lock, because it communicates
        // with external services (in HA mode) and may block for a while.
        // 获取Checkpoint ID
        // 非HA模式使用StandaloneCheckpointIDCounter
        // HA模式使用ZooKeeperCheckpointIDCounter
        checkpointID = checkpointIdCounter.getAndIncrement();

        // 获取checkpoint的存储目录
        checkpointStorageLocation = props.isSavepoint() ?
                checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
                checkpointStorage.initializeLocationForCheckpoint(checkpointID);
    }
    catch (Throwable t) {
        int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
        LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
                job,
                numUnsuccessful,
                t);
        throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
    }

    // 创建一个进行中checkpoint
    final PendingCheckpoint checkpoint = new PendingCheckpoint(
        job,
        checkpointID,
        timestamp,
        ackTasks,
        masterHooks.keySet(),
        props,
        checkpointStorageLocation,
        executor);

    if (statsTracker != null) {
        // 获取状态report回调
        PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
            checkpointID,
            timestamp,
            props);

        // 设置状态回调
        checkpoint.setStatsCallback(callback);
    }

    // schedule the timer that will clean up the expired checkpoints
    // 过期checkpoint的清理逻辑
    final Runnable canceller = () -> {
        synchronized (lock) {
            // only do the work if the checkpoint is not discarded anyways
            // note that checkpoint completion discards the pending checkpoint object
            // 排除已废弃的checkpoint
            if (!checkpoint.isDiscarded()) {
                LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

                // 放弃正在进行中的checkpoint
                failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
                // pendingCheckpoints移除此checkpoint
                pendingCheckpoints.remove(checkpointID);
                // 增加最近checkpoint ID
                rememberRecentCheckpointId(checkpointID);
                // 触发排队等待的checkpoint操作
                // 下面的preCheckBeforeTriggeringCheckpoint方法会触发并发checkpoint检查
                // pendingCheckpoints大于maxConcurrentCheckpointAttempts的时候
                // triggerRequestQueued会设置为true
                // 此时会立刻触发一次checkpoint操作
                triggerQueuedRequests();
            }
        }
    };

    try {
        // re-acquire the coordinator-wide lock
        synchronized (lock) {
            // 参数检查
            preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());

            LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
            // 加入checkpoint到pendingCheckpoints集合中
            pendingCheckpoints.put(checkpointID, checkpoint);

            // 注册一个定时触发任务
            // 在checkpoint超时的时候执行checkpoint取消任务
            // 具体取消任务为上一段分析的canceller
            ScheduledFuture<?> cancellerHandle = timer.schedule(
                    canceller,
                    checkpointTimeout, TimeUnit.MILLISECONDS);

            // 如果task取消handle没有设置成功(此处只有一种可能,checkpoint已被废弃)
            if (!checkpoint.setCancellerHandle(cancellerHandle)) {
                // checkpoint is already disposed!
                // 调用handle的取消checkpoint方法
                cancellerHandle.cancel(false);
            }

            // TODO, asynchronously snapshots master hook without waiting here
            // 循环调用master hook
            // MasterTriggerRestoreHook用于生成或回复checkpoint之前通知外部系统
            for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
                final MasterState masterState =
                    MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
                        .get(checkpointTimeout, TimeUnit.MILLISECONDS);
                checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
            }
            
            // 检查未确认的master状态集合是否为空
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
        }
        // end of lock scope

        // 获取checkpoint类型和存储位置配置
        final CheckpointOptions checkpointOptions = new CheckpointOptions(
                props.getCheckpointType(),
                checkpointStorageLocation.getLocationReference());

        // send the messages to the tasks that trigger their checkpoint
        // 触发所有tasksToTrigger的checkpoint创建过程
        for (Execution execution: executions) {
            if (props.isSynchronous()) {
                execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
            } else {
                execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
            }
        }

        // 设置checkpoint未成功触发器个数为0
        numUnsuccessfulCheckpointsTriggers.set(0);
        return checkpoint.getCompletionFuture();
    }
    catch (Throwable t) {
        // guard the map against concurrent modifications
        synchronized (lock) {
            pendingCheckpoints.remove(checkpointID);
        }

        int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
        LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
                checkpointID, job, numUnsuccessful, t);

        if (!checkpoint.isDiscarded()) {
            failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
        }

        try {
            checkpointStorageLocation.disposeOnFailure();
        }
        catch (Throwable t2) {
            LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
        }

        // rethrow the CheckpointException directly.
        if (t instanceof CheckpointException) {
            throw (CheckpointException) t;
        }
        throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
    }
}

这段方法比较长,大部分的逻辑为校验task是否在运行,参数校验和调用masterHook。

下面我们分析下触发checkpoint的入口execution.triggerCheckpoint方法。

public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
    triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, false);
}

triggerCheckpointHelper方法:

private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

    final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
    if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    // 获取slot
    final LogicalSlot slot = assignedResource;

    if (slot != null) {
        // 获取TaskManagerGateway
        // TaskManagerGateway用户JobManager和TaskManager通信
        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

        // 触发checkpoint操作
        taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
    } else {
        LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
    }
}

这里taskManagerGatewayRpcTaskManagerGateway类型。我们查看RpcTaskManagerGatewaytriggerCheckpoint方法。代码如下:

@Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
    taskExecutorGateway.triggerCheckpoint(
        executionAttemptID,
        checkpointId,
        timestamp,
        checkpointOptions,
        advanceToEndOfEventTime);
}

此处taskExecutorGatewayTaskExecutor类型。我们继续跟踪它的triggerCheckpoint方法:

@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        long checkpointTimestamp,
        CheckpointOptions checkpointOptions,
        boolean advanceToEndOfEventTime) {
    log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

    final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
    if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    // 获取slot中运行的Task
    final Task task = taskSlotTable.getTask(executionAttemptID);

    if (task != null) {
        // 触发task发送CheckpointBarrier
        task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

        return CompletableFuture.completedFuture(Acknowledge.get());
    } else {
        final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

        log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
    }
}

TasktriggerCheckpointBarrier方法:

public void triggerCheckpointBarrier(
        final long checkpointID,
        final long checkpointTimestamp,
        final CheckpointOptions checkpointOptions,
        final boolean advanceToEndOfEventTime) {

    final AbstractInvokable invokable = this.invokable;
    final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

    if (executionState == ExecutionState.RUNNING && invokable != null) {
        try {
            invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
        }
        catch (RejectedExecutionException ex) {
            // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
            LOG.debug(
                "Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
                checkpointID, taskNameWithSubtask, executionId);
        }
        catch (Throwable t) {
            if (getExecutionState() == ExecutionState.RUNNING) {
                failExternally(new Exception(
                    "Error while triggering checkpoint " + checkpointID + " for " +
                        taskNameWithSubtask, t));
            } else {
                LOG.debug("Encountered error while triggering checkpoint {} for " +
                    "{} ({}) while being not in state running.", checkpointID,
                    taskNameWithSubtask, executionId, t);
            }
        }
    }
    else {
        LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

        // send back a message that we did not do the checkpoint
        checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    }
}

此方法的核心就一句invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime)。这里的invokable是具体的Task类型,可以是SourceStreamTaskStreamTask。其中SourceStreamTask的父类是StreamTask,调用的triggerCheckpointAsync也是父类中的方法。我们分析下StreamTasktriggerCheckpointAsync方法:

@Override
public Future<Boolean> triggerCheckpointAsync(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        boolean advanceToEndOfEventTime) {

    return mailboxProcessor.getMainMailboxExecutor().submit(
            () -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),
            "checkpoint %s with %s",
        checkpointMetaData,
        checkpointOptions);
}

继续跟踪triggerCheckpoint方法。代码如下:

private boolean triggerCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        boolean advanceToEndOfEventTime) throws Exception {
    try {
        // No alignment if we inject a checkpoint
        // 初始化checkpoint监控
        CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
            .setBytesBufferedInAlignment(0L)
            .setAlignmentDurationNanos(0L);

        // 执行checkpoint
        boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
        if (!success) {
            // 如果没有成功,拒绝此checkpoint
            declineCheckpoint(checkpointMetaData.getCheckpointId());
        }
        return success;
    } catch (Exception e) {
        // propagate exceptions only if the task is still in "running" state
        if (isRunning) {
            Exception exception = new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                " for operator " + getName() + '.', e);
            handleCheckpointException(exception);
            throw exception;
        } else {
            LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
            return false;
        }
    }
}

主要逻辑在performCheckpoint方法,源代码如下:

private boolean performCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetrics checkpointMetrics,
        boolean advanceToEndOfTime) throws Exception {

    LOG.debug("Starting checkpoint ({}) {} on task {}",
        checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

    final long checkpointId = checkpointMetaData.getCheckpointId();

    if (isRunning) {
        actionExecutor.runThrowing(() -> {

            if (checkpointOptions.getCheckpointType().isSynchronous()) {
                setSynchronousSavepointId(checkpointId);

                if (advanceToEndOfTime) {
                    advanceToEndOfEventTime();
                }
            }

            // All of the following steps happen as an atomic step from the perspective of barriers and
            // records/watermarks/timers/callbacks.
            // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
            // checkpoint alignments

            // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
            //           The pre-barrier work should be nothing or minimal in the common case.
            // 执行operatorChain的准备checkpoint屏障的逻辑
            operatorChain.prepareSnapshotPreBarrier(checkpointId);

            // Step (2): Send the checkpoint barrier downstream
            // 广播checkpoint屏障到下游
            // operatorChain保存了所有的数据输出
            // 遍历所有的输出,将checkpoint屏障发送给它们
            operatorChain.broadcastCheckpointBarrier(
                    checkpointId,
                    checkpointMetaData.getTimestamp(),
                    checkpointOptions);

            // Step (3): Take the state snapshot. This should be largely asynchronous, to not
            //           impact progress of the streaming topology
            // task自己执行checkpoint过程
            checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

        });

        return true;
    } else {
        actionExecutor.runThrowing(() -> {
            // we cannot perform our checkpoint - let the downstream operators know that they
            // should not wait for any input from this operator

            // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
            // yet be created
            // 如果任务没有在运行,发送取消checkpoint的标记到下游
            final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            recordWriter.broadcastEvent(message);
        });

        return false;
    }
}

OperatorChainbroadcastCheckpointBarrier方法负责将checkpoint屏障发送到chain的所有下游输出端。下游task接收到屏障后也是再次向后传递屏障,同时自己进行checkpoint,直到屏障传递到sink。详细专题分析请参见:Flink 源码之分布式快照

broadcastCheckpointBarrier代码如下所示:

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
    CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
    for (RecordWriterOutput<?> streamOutput : streamOutputs) {
        streamOutput.broadcastEvent(barrier);
    }
}

checkpoint的具体过程在checkpointState方法中。它的代码如下:

private void checkpointState(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetrics checkpointMetrics) throws Exception {

    CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
            checkpointMetaData.getCheckpointId(),
            checkpointOptions.getTargetLocation());

    // 创建一个Checkpoint操作
    CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
        this,
        checkpointMetaData,
        checkpointOptions,
        storage,
        checkpointMetrics);

    // 执行checkpoint
    checkpointingOperation.executeCheckpointing();
}

我们继续分析CheckpointingOperationexecuteCheckpointing方法。代码如下:

public void executeCheckpointing() throws Exception {
    startSyncPartNano = System.nanoTime();

    try {
        // 对所有operator进行checkpoint
        for (StreamOperator<?> op : allOperators) {
            // 调用AbstractStreamOperator的snapshotState方法
            // snapshotState方法中是具体执行snapshot的逻辑
            // 逻辑比较复杂,此处暂不分析,待明确之后另开一篇讨论
            checkpointStreamOperator(op);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                checkpointMetaData.getCheckpointId(), owner.getName());
        }

        startAsyncPartNano = System.nanoTime();

        checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

        // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
        // 执行需要完成snapshot过程的逻辑
        AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
            owner,
            operatorSnapshotsInProgress,
            checkpointMetaData,
            checkpointMetrics,
            startAsyncPartNano);

        owner.cancelables.registerCloseable(asyncCheckpointRunnable);
        owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                    "Alignment duration: {} ms, snapshot duration {} ms",
                owner.getName(), checkpointMetaData.getCheckpointId(),
                checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                checkpointMetrics.getSyncDurationMillis());
        }
    } catch (Exception ex) {
        // Cleanup to release resources
        for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
            if (null != operatorSnapshotResult) {
                try {
                    operatorSnapshotResult.cancel();
                } catch (Exception e) {
                    LOG.warn("Could not properly cancel an operator snapshot result.", e);
                }
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                    "Alignment duration: {} ms, snapshot duration {} ms",
                owner.getName(), checkpointMetaData.getCheckpointId(),
                checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                checkpointMetrics.getSyncDurationMillis());
        }

        if (checkpointOptions.getCheckpointType().isSynchronous()) {
            // in the case of a synchronous checkpoint, we always rethrow the exception,
            // so that the task fails.
            // this is because the intention is always to stop the job after this checkpointing
            // operation, and without the failure, the task would go back to normal execution.
            throw ex;
        } else {
            owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
        }
    }
}

owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable)这一句调用了AsyncCheckpointRunnablerun方法。该方法封装了完成执行snapshot过程的逻辑。代码如下:

public void run() {
    FileSystemSafetyNet.initializeSafetyNetForThread();
    try {

        TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
            new TaskStateSnapshot(operatorSnapshotsInProgress.size());

        TaskStateSnapshot localTaskOperatorSubtaskStates =
            new TaskStateSnapshot(operatorSnapshotsInProgress.size());

        for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {

            OperatorID operatorID = entry.getKey();
            // 获取每个operator正在运行的快照任务
            OperatorSnapshotFutures snapshotInProgress = entry.getValue();

            // finalize the async part of all by executing all snapshot runnables
            // 创建finalizedSnapshots,创建的时候
            // 执行快照过程中的异步方法
            OperatorSnapshotFinalizer finalizedSnapshots =
                new OperatorSnapshotFinalizer(snapshotInProgress);

            // operator快照状态的一个副本,用于向JobManager汇报的快照状态
            jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                operatorID,
                finalizedSnapshots.getJobManagerOwnedState());

            // operator快照状态的一个副本,用于Task本地的故障快速恢复
            localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                operatorID,
                finalizedSnapshots.getTaskLocalState());
        }

        final long asyncEndNanos = System.nanoTime();
        final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;

        // 为监控系统提供快照操作的耗时
        checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

        // 将快照状态从运行中设置为已完成
        if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
            CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {

            // 汇报快照过程已经完成,该方法稍后分析
            reportCompletedSnapshotStates(
                jobManagerTaskOperatorSubtaskStates,
                localTaskOperatorSubtaskStates,
                asyncDurationMillis);

        } else {
            LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
                owner.getName(),
                checkpointMetaData.getCheckpointId());
        }
    } catch (Exception e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - asynchronous part of checkpoint {} could not be completed.",
                owner.getName(),
                checkpointMetaData.getCheckpointId(),
                e);
        }
        handleExecutionException(e);
    } finally {
        owner.cancelables.unregisterCloseable(this);
        FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
    }
}

Snapshot确认消息发送

整个过程的调用链

  • StreamTask.reportCompletedSnapshotStates
  • TaskStateManagerImpl.reportTaskStateSnapshots
  • RpcCheckpointResponder.acknowledgeCheckpoint
  • JobMaster.acknowledgeCheckpoint
  • SchedulerBase.acknowledgeCheckpoint
  • CheckpointCoordinator.receiveAcknowledgeMessage
  • CheckpointCoordinator.completePendingCheckpoint
  • PendingCheckpoint.finalizeCheckpoint

AsyncCheckpointRunnable.reportCompletedSnapshotStates

我们从AsyncCheckpointRunnablereportCompletedSnapshotStates方法开始分析。reportCompletedSnapshotStates负责汇报snapshot过程成功完成,代码如下:

private void reportCompletedSnapshotStates(
    TaskStateSnapshot acknowledgedTaskStateSnapshot,
    TaskStateSnapshot localTaskStateSnapshot,
    long asyncDurationMillis) {

    // 获取任务状态管理器
    TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();

    boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
    boolean hasLocalState = localTaskStateSnapshot.hasState();

    Preconditions.checkState(hasAckState || !hasLocalState,
        "Found cached state but no corresponding primary state is reported to the job " +
            "manager. This indicates a problem.");

    // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
    // to stateless tasks on restore. This enables simple job modifications that only concern
    // stateless without the need to assign them uids to match their (always empty) states.
    // 上报任务快照状态
    taskStateManager.reportTaskStateSnapshots(
        checkpointMetaData,
        checkpointMetrics,
        hasAckState ? acknowledgedTaskStateSnapshot : null,
        hasLocalState ? localTaskStateSnapshot : null);

    LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
        owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);

    LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
        owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}

TaskStateManagerImpl.reportTaskStateSnapshots

上一个代码片段中taskStateManager的实现类为TaskStateManagerImpl。接下来分析下reportTaskStateSnapshots方法。

@Override
public void reportTaskStateSnapshots(
    @Nonnull CheckpointMetaData checkpointMetaData,
    @Nonnull CheckpointMetrics checkpointMetrics,
    @Nullable TaskStateSnapshot acknowledgedState,
    @Nullable TaskStateSnapshot localState) {

    long checkpointId = checkpointMetaData.getCheckpointId();

    // 保存本地快照状态
    localStateStore.storeLocalState(checkpointId, localState);

    // 发送快照成功确认消息
    checkpointResponder.acknowledgeCheckpoint(
        jobId,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        acknowledgedState);
}

RpcCheckpointResponder.acknowledgeCheckpoint

上面checkpointResponder类型为RpcCheckpointResponder。我们查看下它的acknowledgeCheckpoint方法:

@Override
public void acknowledgeCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        TaskStateSnapshot subtaskState) {

    checkpointCoordinatorGateway.acknowledgeCheckpoint(
        jobID,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        subtaskState);
}

该方法调用了checkpointCoordinatorGateway的同名方法。这里checkpointCoordinatorGatewayJobMaster对象。
RpcCheckpointResponder在JobManager选举成功的时候,建立和JobManager的联系的时候创建。

JobMaster.acknowledgeCheckpoint

@Override
public void acknowledgeCheckpoint(
        final JobID jobID,
        final ExecutionAttemptID executionAttemptID,
        final long checkpointId,
        final CheckpointMetrics checkpointMetrics,
        final TaskStateSnapshot checkpointState) {

    schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}

SchedulerBase.acknowledgeCheckpoint

我们查看该方法的源代码。经历过层层调用之后我们在这里找到了和CheckpointCoordinator的交互过程。代码如下所示。

@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
    // 确保在主线程中运行
    mainThreadExecutor.assertRunningInMainThread();

    // 获取checkpointCoordinator 对象
    final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    // 创建一个checkpoint确认消息
    final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
        jobID,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        checkpointState);

    // 获取taskManager的地址
    final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);

    if (checkpointCoordinator != null) {
        ioExecutor.execute(() -> {
            try {
                // 调用checkpointCoordinator的接收确认消息方法
                checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
            } catch (Throwable t) {
                log.warn("Error while processing checkpoint acknowledgement message", t);
            }
        });
    } else {
        String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
        if (executionGraph.getState() == JobStatus.RUNNING) {
            log.error(errorMessage, jobGraph.getJobID());
        } else {
            log.debug(errorMessage, jobGraph.getJobID());
        }
    }
}

CheckpointCoordinator.receiveAcknowledgeMessage

现在我们回到CheckpointCoordinator,分析下receiveAcknowledgeMessage方法。

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
    if (shutdown || message == null) {
        return false;
    }

    // 检查message的JobID和实际运行的JobID是否相同
    if (!job.equals(message.getJob())) {
        LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message);
        return false;
    }

    // 获取checkpoint ID
    final long checkpointId = message.getCheckpointId();

    synchronized (lock) {
        // we need to check inside the lock for being shutdown as well, otherwise we
        // get races and invalid error log messages
        // 确保没有shutdown
        if (shutdown) {
            return false;
        }

        // 获取正在进行的checkpoint操作
        final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

        if (checkpoint != null && !checkpoint.isDiscarded()) {

            switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                case SUCCESS:
                    LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
                        checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                    // 如果确认成功,并且接收到了所有operator快照成功的确认
                    // 调用完成此PendingCheckpoint的逻辑
                    if (checkpoint.areTasksFullyAcknowledged()) {
                        completePendingCheckpoint(checkpoint);
                    }
                    break;
                case DUPLICATE:
                    LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
                        message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                    break;
                case UNKNOWN:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                            "because the task's execution attempt id was unknown. Discarding " +
                            "the state handle to avoid lingering state.", message.getCheckpointId(),
                        message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                    discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                    break;
                case DISCARDED:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                        "because the pending checkpoint had been discarded. Discarding the " +
                            "state handle tp avoid lingering state.",
                        message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                    discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
            }

            return true;
        }
        else if (checkpoint != null) {
            // this should not happen
            throw new IllegalStateException(
                    "Received message for discarded but non-removed checkpoint " + checkpointId);
        }
        else {
            boolean wasPendingCheckpoint;

            // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
            if (recentPendingCheckpoints.contains(checkpointId)) {
                wasPendingCheckpoint = true;
                LOG.warn("Received late message for now expired checkpoint attempt {} from task " +
                    "{} of job {} at {}.", checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
            }
            else {
                LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.",
                    checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                wasPendingCheckpoint = false;
            }

            // try to discard the state so that we don't have lingering state lying around
            discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

            return wasPendingCheckpoint;
        }
    }
}

在分析完成checkpoint方法(completePendingCheckpoint)之前,我们先分析下PendingCheckpoint的确认过程。
PendingCheckpointacknowledgeTask方法如下所示:

public TaskAcknowledgeResult acknowledgeTask(
        ExecutionAttemptID executionAttemptId,
        TaskStateSnapshot operatorSubtaskStates,
        CheckpointMetrics metrics) {

    synchronized (lock) {
        if (discarded) {
            // 如果checkpoint已废弃,返回DISCARDED
            return TaskAcknowledgeResult.DISCARDED;
        }

        // 从notYetAcknowledgedTasks集合中移除已确认的task
        // notYetAcknowledgedTasks保存了所有未确认的task
        final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);

        if (vertex == null) {
            // 如果notYetAcknowledgedTasks没有该task
            // 但是它在acknowledgedTasks(已确认的task)集合中
            // 返回重复确认DUPLICATE
            if (acknowledgedTasks.contains(executionAttemptId)) {
                return TaskAcknowledgeResult.DUPLICATE;
            } else {
                // 其他情况返回未知
                return TaskAcknowledgeResult.UNKNOWN;
            }
        } else {
            // 添加到已确认task集合中
            acknowledgedTasks.add(executionAttemptId);
        }

        List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
        int subtaskIndex = vertex.getParallelSubtaskIndex();
        long ackTimestamp = System.currentTimeMillis();

        long stateSize = 0L;

        // 这段代码为保存各个operator的snapshot状态
        if (operatorSubtaskStates != null) {
            for (OperatorID operatorID : operatorIDs) {

                OperatorSubtaskState operatorSubtaskState =
                    operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);

                // if no real operatorSubtaskState was reported, we insert an empty state
                if (operatorSubtaskState == null) {
                    operatorSubtaskState = new OperatorSubtaskState();
                }

                OperatorState operatorState = operatorStates.get(operatorID);

                if (operatorState == null) {
                    operatorState = new OperatorState(
                        operatorID,
                        vertex.getTotalNumberOfParallelSubtasks(),
                        vertex.getMaxParallelism());
                    operatorStates.put(operatorID, operatorState);
                }

                operatorState.putState(subtaskIndex, operatorSubtaskState);
                stateSize += operatorSubtaskState.getStateSize();
            }
        }

        ++numAcknowledgedTasks;

        // publish the checkpoint statistics
        // to prevent null-pointers from concurrent modification, copy reference onto stack
        // 这段代码为汇报所有子任务checkpoint状态
        final PendingCheckpointStats statsCallback = this.statsCallback;
        if (statsCallback != null) {
            // Do this in millis because the web frontend works with them
            long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;

            SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
                subtaskIndex,
                ackTimestamp,
                stateSize,
                metrics.getSyncDurationMillis(),
                metrics.getAsyncDurationMillis(),
                metrics.getBytesBufferedInAlignment(),
                alignmentDurationMillis);

            statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
        }

        // 最后返回执行成功
        return TaskAcknowledgeResult.SUCCESS;
    }
}

我们再来看看什么时候会调用completePendingCheckpoint方法完成checkpoint。checkpoint.areTasksFullyAcknowledged()方法返回true的时候会调用。这段代码的逻辑如下:

public boolean areTasksFullyAcknowledged() {
    return notYetAcknowledgedTasks.isEmpty() && !discarded;
}

从上面代码可知,任务没有被废弃,并且notYetAcknowledgedTasks为空(所有的task都已被确认)的时候才会调用completePendingCheckpoint方法。

CheckpointCoordinator.completePendingCheckpoint

private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
    final long checkpointId = pendingCheckpoint.getCheckpointId();
    final CompletedCheckpoint completedCheckpoint;

    // As a first step to complete the checkpoint, we register its state with the registry
    // 注册所有operator的state到sharedStateRegistry
    Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
    sharedStateRegistry.registerAll(operatorStates.values());

    try {
        try {
            // 调用完成pendingCheckpoint的逻辑,具体内容稍后分析
            completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
        // 重置失败checkpoint的计数
            failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
        }
        catch (Exception e1) {
            // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
            if (!pendingCheckpoint.isDiscarded()) {
                failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
            }

            throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
                CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
        }

        // the pending checkpoint must be discarded after the finalization
        // 检查状态,调用finalizeCheckpoint方法后pendingCheckpoint必须为discarded状态
        Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

        try {
            // 存储已完成的checkpoint
            completedCheckpointStore.addCheckpoint(completedCheckpoint);
        } catch (Exception exception) {
            // we failed to store the completed checkpoint. Let's clean up
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        completedCheckpoint.discardOnFailedStoring();
                    } catch (Throwable t) {
                        LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
                    }
                }
            });

            throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
                CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
        }
    } finally {
        // 从正在进行中checkpoint集合中移除此checkpoint
        pendingCheckpoints.remove(checkpointId);

        triggerQueuedRequests();
    }

    // 保存最近的checkpoint ID
    rememberRecentCheckpointId(checkpointId);

    // drop those pending checkpoints that are at prior to the completed one
    // 挂掉所有id小于checkpointId的checkpoint操作(被挂掉的checkpoint不能是强制的)
    dropSubsumedCheckpoints(checkpointId);

    // record the time when this was completed, to calculate
    // the 'min delay between checkpoints'
    // 保存此次checkpoint完成时间
    lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();

    LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
        completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());

    if (LOG.isDebugEnabled()) {
        StringBuilder builder = new StringBuilder();
        builder.append("Checkpoint state: ");
        for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
            builder.append(state);
            builder.append(", ");
        }
        // Remove last two chars ", "
        builder.setLength(builder.length() - 2);

        LOG.debug(builder.toString());
    }

    // send the "notify complete" call to all vertices
    final long timestamp = completedCheckpoint.getTimestamp();

    for (ExecutionVertex ev : tasksToCommitTo) {
        Execution ee = ev.getCurrentExecutionAttempt();
        if (ee != null) {
            // 向各个节点发送checkpoint完成的消息,此方法很重要
            // 逐级发送通知到所有的task,StreamTask,再到所有的operator,userFunction
            // 最后如果userFunction实现了CheckpointListener接口
            // 逐个调用这些userFunction的notifyCheckpointComplete方法
            ee.notifyCheckpointComplete(checkpointId, timestamp);
        }
    }
}

注:通知各个operator checkpoint成功的调用链如下:

  • Execution.notifyCheckpointComplete
  • RpcTaskManagerGateway.notifyCheckpointComplete
  • TaskExecutor.confirmCheckpoint
  • Task.notifyCheckpointComplete
  • StreamTask.notifyCheckpointCompleteAsync
  • StreamTask.notifyCheckpointComplete
  • AbstractUdfStreamOperator.notifyCheckpointComplete
  • CheckpointListener.notifyCheckpointComplete

PendingCheckpoint.finalizeCheckpoint

最后分析一下PendingCheckpoint如何完成,最终生成CompletedCheckpoint对象的过程。代码如下所示:

public CompletedCheckpoint finalizeCheckpoint() throws IOException {

    synchronized (lock) {
        // 保证所有的masterState都确认
        checkState(areMasterStatesFullyAcknowledged(),
            "Pending checkpoint has not been fully acknowledged by master states yet.");
        // 保证所有的task都确认
        checkState(areTasksFullyAcknowledged(),
            "Pending checkpoint has not been fully acknowledged by tasks yet.");

        // make sure we fulfill the promise with an exception if something fails
        try {
            // write out the metadata
            // 创建一个savepoint对象
            final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
            final CompletedCheckpointStorageLocation finalizedLocation;

            // 保存checkpoint数据到文件系统
            try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
                Checkpoints.storeCheckpointMetadata(savepoint, out);
                finalizedLocation = out.closeAndFinalizeCheckpoint();
            }

            CompletedCheckpoint completed = new CompletedCheckpoint(
                    jobId,
                    checkpointId,
                    checkpointTimestamp,
                    System.currentTimeMillis(),
                    operatorStates,
                    masterStates,
                    props,
                    finalizedLocation);

            // completableFuture任务完成,返回completedCheckpoint
            onCompletionPromise.complete(completed);

            // to prevent null-pointers from concurrent modification, copy reference onto stack
            // 设置completedCheckpoint的discardCallback
            PendingCheckpointStats statsCallback = this.statsCallback;
            if (statsCallback != null) {
                // Finalize the statsCallback and give the completed checkpoint a
                // callback for discards.
                CompletedCheckpointStats.DiscardCallback discardCallback =
                        statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
                completed.setDiscardCallback(discardCallback);
            }

            // mark this pending checkpoint as disposed, but do NOT drop the state
            // 标记自己为disposed状态
            dispose(false);

            return completed;
        }
        catch (Throwable t) {
            onCompletionPromise.completeExceptionally(t);
            ExceptionUtils.rethrowIOException(t);
            return null; // silence the compiler
        }
    }
}

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

相关文章

网友评论

    本文标题:Flink 源码之快照

    本文链接:https://www.haomeiwen.com/subject/ihoxnctx.html