美文网首页
聊聊flink的CheckpointScheduler

聊聊flink的CheckpointScheduler

作者: go4it | 来源:发表于2018-12-07 15:24 被阅读55次

    本文主要研究一下flink的CheckpointScheduler

    CheckpointCoordinatorDeActivator

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java

    /**
     * This actor listens to changes in the JobStatus and activates or deactivates the periodic
     * checkpoint scheduler.
     */
    public class CheckpointCoordinatorDeActivator implements JobStatusListener {
    
        private final CheckpointCoordinator coordinator;
    
        public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
            this.coordinator = checkNotNull(coordinator);
        }
    
        @Override
        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
            if (newJobStatus == JobStatus.RUNNING) {
                // start the checkpoint scheduler
                coordinator.startCheckpointScheduler();
            } else {
                // anything else should stop the trigger for now
                coordinator.stopCheckpointScheduler();
            }
        }
    }
    
    • CheckpointCoordinatorDeActivator实现了JobStatusListener接口,在jobStatusChanges的时候,根据状态来调用coordinator.startCheckpointScheduler或者coordinator.stopCheckpointScheduler

    CheckpointCoordinator.ScheduledTrigger

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

    /**
     * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
     * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
     * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
     * reported by the tasks that acknowledge the checkpoint.
     */
    public class CheckpointCoordinator {
    
        /** Map from checkpoint ID to the pending checkpoint */
        private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    
        /** The number of consecutive failed trigger attempts */
        private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
    
        //......
    
        public void startCheckpointScheduler() {
            synchronized (lock) {
                if (shutdown) {
                    throw new IllegalArgumentException("Checkpoint coordinator is shut down");
                }
    
                // make sure all prior timers are cancelled
                stopCheckpointScheduler();
    
                periodicScheduling = true;
                long initialDelay = ThreadLocalRandom.current().nextLong(
                    minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
                currentPeriodicTrigger = timer.scheduleAtFixedRate(
                        new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
            }
        }
    
        public void stopCheckpointScheduler() {
            synchronized (lock) {
                triggerRequestQueued = false;
                periodicScheduling = false;
    
                if (currentPeriodicTrigger != null) {
                    currentPeriodicTrigger.cancel(false);
                    currentPeriodicTrigger = null;
                }
    
                for (PendingCheckpoint p : pendingCheckpoints.values()) {
                    p.abortError(new Exception("Checkpoint Coordinator is suspending."));
                }
    
                pendingCheckpoints.clear();
                numUnsuccessfulCheckpointsTriggers.set(0);
            }
        }
    
        private final class ScheduledTrigger implements Runnable {
    
            @Override
            public void run() {
                try {
                    triggerCheckpoint(System.currentTimeMillis(), true);
                }
                catch (Exception e) {
                    LOG.error("Exception while triggering checkpoint for job {}.", job, e);
                }
            }
        }
    
        //......
    }
    
    • CheckpointCoordinator的startCheckpointScheduler方法首先调用stopCheckpointScheduler取消PendingCheckpoint,之后使用timer.scheduleAtFixedRate重新调度ScheduledTrigger
    • stopCheckpointScheduler会调用PendingCheckpoint.abortError来取消pendingCheckpoints,然后清空pendingCheckpoints(Map<Long, PendingCheckpoint>)以及numUnsuccessfulCheckpointsTriggers(AtomicInteger)
    • ScheduledTrigger实现了Runnable接口,其run方法主要是调用triggerCheckpoint,传递的isPeriodic参数为true

    CheckpointCoordinator.triggerCheckpoint

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

    /**
     * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
     * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
     * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
     * reported by the tasks that acknowledge the checkpoint.
     */
    public class CheckpointCoordinator {
    
        /** Tasks who need to be sent a message when a checkpoint is started */
        private final ExecutionVertex[] tasksToTrigger;
    
        /** Tasks who need to acknowledge a checkpoint before it succeeds */
        private final ExecutionVertex[] tasksToWaitFor;
    
        /** Map from checkpoint ID to the pending checkpoint */
        private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    
        /** The maximum number of checkpoints that may be in progress at the same time */
        private final int maxConcurrentCheckpointAttempts;
    
        /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
         * enforce minimum processing time between checkpoint attempts */
        private final long minPauseBetweenCheckpointsNanos;
    
        /**
         * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint
         * timestamp.
         *
         * @param timestamp The timestamp for the checkpoint.
         * @param isPeriodic Flag indicating whether this triggered checkpoint is
         * periodic. If this flag is true, but the periodic scheduler is disabled,
         * the checkpoint will be declined.
         * @return <code>true</code> if triggering the checkpoint succeeded.
         */
        public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
            return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
        }
    
        @VisibleForTesting
        public CheckpointTriggerResult triggerCheckpoint(
                long timestamp,
                CheckpointProperties props,
                @Nullable String externalSavepointLocation,
                boolean isPeriodic) {
    
            // make some eager pre-checks
            synchronized (lock) {
                // abort if the coordinator has been shutdown in the meantime
                if (shutdown) {
                    return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                }
    
                // Don't allow periodic checkpoint if scheduling has been disabled
                if (isPeriodic && !periodicScheduling) {
                    return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
                }
    
                // validate whether the checkpoint can be triggered, with respect to the limit of
                // concurrent checkpoints, and the minimum time between checkpoints.
                // these checks are not relevant for savepoints
                if (!props.forceCheckpoint()) {
                    // sanity check: there should never be more than one trigger request queued
                    if (triggerRequestQueued) {
                        LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
                        return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                    }
    
                    // if too many checkpoints are currently in progress, we need to mark that a request is queued
                    if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                        triggerRequestQueued = true;
                        if (currentPeriodicTrigger != null) {
                            currentPeriodicTrigger.cancel(false);
                            currentPeriodicTrigger = null;
                        }
                        return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                    }
    
                    // make sure the minimum interval between checkpoints has passed
                    final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                    final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
    
                    if (durationTillNextMillis > 0) {
                        if (currentPeriodicTrigger != null) {
                            currentPeriodicTrigger.cancel(false);
                            currentPeriodicTrigger = null;
                        }
                        // Reassign the new trigger to the currentPeriodicTrigger
                        currentPeriodicTrigger = timer.scheduleAtFixedRate(
                                new ScheduledTrigger(),
                                durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
    
                        return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                    }
                }
            }
    
            // check if all tasks that we need to trigger are running.
            // if not, abort the checkpoint
            Execution[] executions = new Execution[tasksToTrigger.length];
            for (int i = 0; i < tasksToTrigger.length; i++) {
                Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
                if (ee == null) {
                    LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                            tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                            job);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                } else if (ee.getState() == ExecutionState.RUNNING) {
                    executions[i] = ee;
                } else {
                    LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                            tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                            job,
                            ExecutionState.RUNNING,
                            ee.getState());
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                }
            }
    
            // next, check if all tasks that need to acknowledge the checkpoint are running.
            // if not, abort the checkpoint
            Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
    
            for (ExecutionVertex ev : tasksToWaitFor) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) {
                    ackTasks.put(ee.getAttemptId(), ev);
                } else {
                    LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                            ev.getTaskNameWithSubtaskIndex(),
                            job);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                }
            }
    
            // we will actually trigger this checkpoint!
    
            // we lock with a special lock to make sure that trigger requests do not overtake each other.
            // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
            // may issue blocking operations. Using a different lock than the coordinator-wide lock,
            // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
            synchronized (triggerLock) {
    
                final CheckpointStorageLocation checkpointStorageLocation;
                final long checkpointID;
    
                try {
                    // this must happen outside the coordinator-wide lock, because it communicates
                    // with external services (in HA mode) and may block for a while.
                    checkpointID = checkpointIdCounter.getAndIncrement();
    
                    checkpointStorageLocation = props.isSavepoint() ?
                            checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
                            checkpointStorage.initializeLocationForCheckpoint(checkpointID);
                }
                catch (Throwable t) {
                    int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                    LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
                            job,
                            numUnsuccessful,
                            t);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                }
    
                final PendingCheckpoint checkpoint = new PendingCheckpoint(
                    job,
                    checkpointID,
                    timestamp,
                    ackTasks,
                    props,
                    checkpointStorageLocation,
                    executor);
    
                if (statsTracker != null) {
                    PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
                        checkpointID,
                        timestamp,
                        props);
    
                    checkpoint.setStatsCallback(callback);
                }
    
                // schedule the timer that will clean up the expired checkpoints
                final Runnable canceller = () -> {
                    synchronized (lock) {
                        // only do the work if the checkpoint is not discarded anyways
                        // note that checkpoint completion discards the pending checkpoint object
                        if (!checkpoint.isDiscarded()) {
                            LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
    
                            checkpoint.abortExpired();
                            pendingCheckpoints.remove(checkpointID);
                            rememberRecentCheckpointId(checkpointID);
    
                            triggerQueuedRequests();
                        }
                    }
                };
    
                try {
                    // re-acquire the coordinator-wide lock
                    synchronized (lock) {
                        // since we released the lock in the meantime, we need to re-check
                        // that the conditions still hold.
                        if (shutdown) {
                            return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                        }
                        else if (!props.forceCheckpoint()) {
                            if (triggerRequestQueued) {
                                LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
                                return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                            }
    
                            if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                                triggerRequestQueued = true;
                                if (currentPeriodicTrigger != null) {
                                    currentPeriodicTrigger.cancel(false);
                                    currentPeriodicTrigger = null;
                                }
                                return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                            }
    
                            // make sure the minimum interval between checkpoints has passed
                            final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                            final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
    
                            if (durationTillNextMillis > 0) {
                                if (currentPeriodicTrigger != null) {
                                    currentPeriodicTrigger.cancel(false);
                                    currentPeriodicTrigger = null;
                                }
    
                                // Reassign the new trigger to the currentPeriodicTrigger
                                currentPeriodicTrigger = timer.scheduleAtFixedRate(
                                        new ScheduledTrigger(),
                                        durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
    
                                return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                            }
                        }
    
                        LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
    
                        pendingCheckpoints.put(checkpointID, checkpoint);
    
                        ScheduledFuture<?> cancellerHandle = timer.schedule(
                                canceller,
                                checkpointTimeout, TimeUnit.MILLISECONDS);
    
                        if (!checkpoint.setCancellerHandle(cancellerHandle)) {
                            // checkpoint is already disposed!
                            cancellerHandle.cancel(false);
                        }
    
                        // trigger the master hooks for the checkpoint
                        final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
                                checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
                        for (MasterState s : masterStates) {
                            checkpoint.addMasterState(s);
                        }
                    }
                    // end of lock scope
    
                    final CheckpointOptions checkpointOptions = new CheckpointOptions(
                            props.getCheckpointType(),
                            checkpointStorageLocation.getLocationReference());
    
                    // send the messages to the tasks that trigger their checkpoint
                    for (Execution execution: executions) {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
    
                    numUnsuccessfulCheckpointsTriggers.set(0);
                    return new CheckpointTriggerResult(checkpoint);
                }
                catch (Throwable t) {
                    // guard the map against concurrent modifications
                    synchronized (lock) {
                        pendingCheckpoints.remove(checkpointID);
                    }
    
                    int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                    LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
                            checkpointID, job, numUnsuccessful, t);
    
                    if (!checkpoint.isDiscarded()) {
                        checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
                    }
    
                    try {
                        checkpointStorageLocation.disposeOnFailure();
                    }
                    catch (Throwable t2) {
                        LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
                    }
    
                    return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                }
    
            } // end trigger lock
        }
    
        //......
    }
    
    • 首先判断如果不是forceCheckpoint的话,则判断当前的pendingCheckpoints值是否超过maxConcurrentCheckpointAttempts,超过的话,立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);之后判断距离lastCheckpointCompletionNanos的时间是否大于等于minPauseBetweenCheckpointsNanos,否则fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS),确保checkpoint不被频繁触发
    • 之后检查tasksToTrigger的任务(触发checkpoint的时候需要通知到的task)是否都处于RUNNING状态,不是的话则立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)
    • 之后检查tasksToWaitFor的任务(需要在执行成功的时候ack checkpoint的任务)是否都处于RUNNING状态,不是的话立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)
    • 前面几步检查通过了之后才开始真正的checkpoint的触发,它首先分配一个checkpointID,然后初始化checkpointStorageLocation,如果异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);之后创建PendingCheckpoint,同时准备canceller(用于在失效的时候执行abort操作);之后对于不是forceCheckpoint的,再重新来一轮TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校验
    • 最后就是针对Execution,挨个触发execution的triggerCheckpoint操作,成功返回CheckpointTriggerResult(checkpoint),异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)

    Execution.triggerCheckpoint

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java

    public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
    
        /**
         * Trigger a new checkpoint on the task of this execution.
         *
         * @param checkpointId of th checkpoint to trigger
         * @param timestamp of the checkpoint to trigger
         * @param checkpointOptions of the checkpoint to trigger
         */
        public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
            final LogicalSlot slot = assignedResource;
    
            if (slot != null) {
                final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    
                taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
            } else {
                LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                    "no longer running.");
            }
        }
    
        //......
    }
    
    • triggerCheckpoint主要是调用taskManagerGateway.triggerCheckpoint,这里的taskManagerGateway为RpcTaskManagerGateway

    RpcTaskManagerGateway

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java

    /**
     * Implementation of the {@link TaskManagerGateway} for Flink's RPC system.
     */
    public class RpcTaskManagerGateway implements TaskManagerGateway {
    
        private final TaskExecutorGateway taskExecutorGateway;
    
        public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
            taskExecutorGateway.triggerCheckpoint(
                executionAttemptID,
                checkpointId,
                timestamp,
                checkpointOptions);
        }
    
        //......
    }
    
    • RpcTaskManagerGateway的triggerCheckpoint方法调用taskExecutorGateway.triggerCheckpoint,这里的taskExecutorGateway为AkkaInvocationHandler,通过rpc通知TaskExecutor

    TaskExecutor.triggerCheckpoint

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

    /**
     * TaskExecutor implementation. The task executor is responsible for the execution of multiple
     * {@link Task}.
     */
    public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
    
        public CompletableFuture<Acknowledge> triggerCheckpoint(
                ExecutionAttemptID executionAttemptID,
                long checkpointId,
                long checkpointTimestamp,
                CheckpointOptions checkpointOptions) {
            log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
    
            final Task task = taskSlotTable.getTask(executionAttemptID);
    
            if (task != null) {
                task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
    
                return CompletableFuture.completedFuture(Acknowledge.get());
            } else {
                final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
    
                log.debug(message);
                return FutureUtils.completedExceptionally(new CheckpointException(message));
            }
        }
    
        //......
    }
    
    • TaskExecutor的triggerCheckpoint方法这里调用task.triggerCheckpointBarrier

    Task.triggerCheckpointBarrier

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

    public class Task implements Runnable, TaskActions, CheckpointListener {
    
        /** The invokable of this task, if initialized. All accesses must copy the reference and
         * check for null, as this field is cleared as part of the disposal logic. */
        @Nullable
        private volatile AbstractInvokable invokable;
    
        /**
         * Calls the invokable to trigger a checkpoint.
         *
         * @param checkpointID The ID identifying the checkpoint.
         * @param checkpointTimestamp The timestamp associated with the checkpoint.
         * @param checkpointOptions Options for performing this checkpoint.
         */
        public void triggerCheckpointBarrier(
                final long checkpointID,
                long checkpointTimestamp,
                final CheckpointOptions checkpointOptions) {
    
            final AbstractInvokable invokable = this.invokable;
            final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
    
            if (executionState == ExecutionState.RUNNING && invokable != null) {
    
                // build a local closure
                final String taskName = taskNameWithSubtask;
                final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
                    FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
    
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        // set safety net from the task's context for checkpointing thread
                        LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
    
                        try {
                            boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                            if (!success) {
                                checkpointResponder.declineCheckpoint(
                                        getJobID(), getExecutionId(), checkpointID,
                                        new CheckpointDeclineTaskNotReadyException(taskName));
                            }
                        }
                        catch (Throwable t) {
                            if (getExecutionState() == ExecutionState.RUNNING) {
                                failExternally(new Exception(
                                    "Error while triggering checkpoint " + checkpointID + " for " +
                                        taskNameWithSubtask, t));
                            } else {
                                LOG.debug("Encountered error while triggering checkpoint {} for " +
                                    "{} ({}) while being not in state running.", checkpointID,
                                    taskNameWithSubtask, executionId, t);
                            }
                        } finally {
                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                        }
                    }
                };
                executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
            }
            else {
                LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
    
                // send back a message that we did not do the checkpoint
                checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                        new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
            }
        }
    
        //......
    }
    
    • Task的triggerCheckpointBarrier方法首先判断executionState是否RUNNING以及invokable是否不为null,不满足条件则执行checkpointResponder.declineCheckpoint
    • 满足条件则执行executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId))
    • 这个runnable方法里头会执行invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions),这里的invokable为SourceStreamTask

    SourceStreamTask.triggerCheckpoint

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

    @Internal
    public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
        extends StreamTask<OUT, OP> {
    
        private volatile boolean externallyInducedCheckpoints;
    
        @Override
        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
            if (!externallyInducedCheckpoints) {
                return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
            }
            else {
                // we do not trigger checkpoints here, we simply state whether we can trigger them
                synchronized (getCheckpointLock()) {
                    return isRunning();
                }
            }
        }
    
        //......
    }
    
    • SourceStreamTask的triggerCheckpoint先判断,如果externallyInducedCheckpoints为false,则调用父类StreamTask的triggerCheckpoint

    StreamTask.triggerCheckpoint

    @Internal
    public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
            extends AbstractInvokable
            implements AsyncExceptionHandler {
    
        @Override
        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
            try {
                // No alignment if we inject a checkpoint
                CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
                        .setBytesBufferedInAlignment(0L)
                        .setAlignmentDurationNanos(0L);
    
                return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
            }
            catch (Exception e) {
                // propagate exceptions only if the task is still in "running" state
                if (isRunning) {
                    throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                        " for operator " + getName() + '.', e);
                } else {
                    LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                        "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
                    return false;
                }
            }
        }
    
        private boolean performCheckpoint(
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointMetrics checkpointMetrics) throws Exception {
    
            LOG.debug("Starting checkpoint ({}) {} on task {}",
                checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
    
            synchronized (lock) {
                if (isRunning) {
                    // we can do a checkpoint
    
                    // All of the following steps happen as an atomic step from the perspective of barriers and
                    // records/watermarks/timers/callbacks.
                    // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                    // checkpoint alignments
    
                    // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                    //           The pre-barrier work should be nothing or minimal in the common case.
                    operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
    
                    // Step (2): Send the checkpoint barrier downstream
                    operatorChain.broadcastCheckpointBarrier(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetaData.getTimestamp(),
                            checkpointOptions);
    
                    // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                    //           impact progress of the streaming topology
                    checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                    return true;
                }
                else {
                    // we cannot perform our checkpoint - let the downstream operators know that they
                    // should not wait for any input from this operator
    
                    // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                    // yet be created
                    final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                    Exception exception = null;
    
                    for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
                        try {
                            streamRecordWriter.broadcastEvent(message);
                        } catch (Exception e) {
                            exception = ExceptionUtils.firstOrSuppressed(
                                new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
                                exception);
                        }
                    }
    
                    if (exception != null) {
                        throw exception;
                    }
    
                    return false;
                }
            }
        }
    
        private void checkpointState(
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointMetrics checkpointMetrics) throws Exception {
    
            CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
                    checkpointMetaData.getCheckpointId(),
                    checkpointOptions.getTargetLocation());
    
            CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
                this,
                checkpointMetaData,
                checkpointOptions,
                storage,
                checkpointMetrics);
    
            checkpointingOperation.executeCheckpointing();
        }
    
        //......
    }
    
    • StreamTask的triggerCheckpoint方法的主要处理逻辑在performCheckpoint方法上,该方法针对task的isRunning分别进行不同处理
    • isRunning为true的时候,这里头分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()
    • 如果isRunning为false,则这里streamRecordWriter.broadcastEvent(message),这里的message为CancelCheckpointMarker

    OperatorChain.prepareSnapshotPreBarrier

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

    @Internal
    public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
    
        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            // go forward through the operator chain and tell each operator
            // to prepare the checkpoint
            final StreamOperator<?>[] operators = this.allOperators;
            for (int i = operators.length - 1; i >= 0; --i) {
                final StreamOperator<?> op = operators[i];
                if (op != null) {
                    op.prepareSnapshotPreBarrier(checkpointId);
                }
            }
        }
    
        //......
    }
    
    • OperatorChain的prepareSnapshotPreBarrier会遍历allOperators挨个调用StreamOperator的prepareSnapshotPreBarrier方法

    OperatorChain.broadcastCheckpointBarrier

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

    @Internal
    public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
    
        public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
            CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
            for (RecordWriterOutput<?> streamOutput : streamOutputs) {
                streamOutput.broadcastEvent(barrier);
            }
        }
    
        //......
    }
    
    • OperatorChain的broadcastCheckpointBarrier方法则会遍历streamOutputs挨个调用streamOutput的broadcastEvent方法

    CheckpointingOperation.executeCheckpointing

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

        private static final class CheckpointingOperation {
    
            private final StreamTask<?, ?> owner;
    
            private final CheckpointMetaData checkpointMetaData;
            private final CheckpointOptions checkpointOptions;
            private final CheckpointMetrics checkpointMetrics;
            private final CheckpointStreamFactory storageLocation;
    
            private final StreamOperator<?>[] allOperators;
    
            private long startSyncPartNano;
            private long startAsyncPartNano;
    
            // ------------------------
    
            private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
    
            public CheckpointingOperation(
                    StreamTask<?, ?> owner,
                    CheckpointMetaData checkpointMetaData,
                    CheckpointOptions checkpointOptions,
                    CheckpointStreamFactory checkpointStorageLocation,
                    CheckpointMetrics checkpointMetrics) {
    
                this.owner = Preconditions.checkNotNull(owner);
                this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
                this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
                this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
                this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation);
                this.allOperators = owner.operatorChain.getAllOperators();
                this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length);
            }
    
            public void executeCheckpointing() throws Exception {
                startSyncPartNano = System.nanoTime();
    
                try {
                    for (StreamOperator<?> op : allOperators) {
                        checkpointStreamOperator(op);
                    }
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                            checkpointMetaData.getCheckpointId(), owner.getName());
                    }
    
                    startAsyncPartNano = System.nanoTime();
    
                    checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
    
                    // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                    AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                        owner,
                        operatorSnapshotsInProgress,
                        checkpointMetaData,
                        checkpointMetrics,
                        startAsyncPartNano);
    
                    owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                    owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                                "Alignment duration: {} ms, snapshot duration {} ms",
                            owner.getName(), checkpointMetaData.getCheckpointId(),
                            checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                            checkpointMetrics.getSyncDurationMillis());
                    }
                } catch (Exception ex) {
                    // Cleanup to release resources
                    for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                        if (null != operatorSnapshotResult) {
                            try {
                                operatorSnapshotResult.cancel();
                            } catch (Exception e) {
                                LOG.warn("Could not properly cancel an operator snapshot result.", e);
                            }
                        }
                    }
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                                "Alignment duration: {} ms, snapshot duration {} ms",
                            owner.getName(), checkpointMetaData.getCheckpointId(),
                            checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                            checkpointMetrics.getSyncDurationMillis());
                    }
    
                    owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);
                }
            }
    
            @SuppressWarnings("deprecation")
            private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
                if (null != op) {
    
                    OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetaData.getTimestamp(),
                            checkpointOptions,
                            storageLocation);
                    operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
                }
            }
    
            private enum AsyncCheckpointState {
                RUNNING,
                DISCARDED,
                COMPLETED
            }
        }
    
    • CheckpointingOperation定义在StreamTask类里头,executeCheckpointing方法先对所有的StreamOperator执行checkpointStreamOperator操作,checkpointStreamOperator方法会调用StreamOperator的snapshotState方法,之后创建AsyncCheckpointRunnable任务并提交异步运行

    AbstractStreamOperator.snapshotState

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

    @PublicEvolving
    public abstract class AbstractStreamOperator<OUT>
            implements StreamOperator<OUT>, Serializable {
    
        @Override
        public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
                CheckpointStreamFactory factory) throws Exception {
    
            KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                    keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
    
            OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
    
            try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                    checkpointId,
                    timestamp,
                    factory,
                    keyGroupRange,
                    getContainingTask().getCancelables())) {
    
                snapshotState(snapshotContext);
    
                snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
                snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
    
                if (null != operatorStateBackend) {
                    snapshotInProgress.setOperatorStateManagedFuture(
                        operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
    
                if (null != keyedStateBackend) {
                    snapshotInProgress.setKeyedStateManagedFuture(
                        keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
            } catch (Exception snapshotException) {
                try {
                    snapshotInProgress.cancel();
                } catch (Exception e) {
                    snapshotException.addSuppressed(e);
                }
    
                String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                    getOperatorName() + ".";
    
                if (!getContainingTask().isCanceled()) {
                    LOG.info(snapshotFailMessage, snapshotException);
                }
                throw new Exception(snapshotFailMessage, snapshotException);
            }
    
            return snapshotInProgress;
        }
    
        /**
         * Stream operators with state, which want to participate in a snapshot need to override this hook method.
         *
         * @param context context that provides information and means required for taking a snapshot
         */
        public void snapshotState(StateSnapshotContext context) throws Exception {
            final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
            //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
            if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
                ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
    
                KeyedStateCheckpointOutputStream out;
    
                try {
                    out = context.getRawKeyedOperatorStateOutput();
                } catch (Exception exception) {
                    throw new Exception("Could not open raw keyed operator state stream for " +
                        getOperatorName() + '.', exception);
                }
    
                try {
                    KeyGroupsList allKeyGroups = out.getKeyGroupList();
                    for (int keyGroupIdx : allKeyGroups) {
                        out.startNewKeyGroup(keyGroupIdx);
    
                        timeServiceManager.snapshotStateForKeyGroup(
                            new DataOutputViewStreamWrapper(out), keyGroupIdx);
                    }
                } catch (Exception exception) {
                    throw new Exception("Could not write timer service of " + getOperatorName() +
                        " to checkpoint state stream.", exception);
                } finally {
                    try {
                        out.close();
                    } catch (Exception closeException) {
                        LOG.warn("Could not close raw keyed operator state stream for {}. This " +
                            "might have prevented deleting some state data.", getOperatorName(), closeException);
                    }
                }
            }
        }
    
        //......
    }
    
    • AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作,具体是触发timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx);不过它有不同的子类可能覆盖了snapshotState方法,比如AbstractUdfStreamOperator

    AbstractUdfStreamOperator

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

    @PublicEvolving
    public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
            extends AbstractStreamOperator<OUT>
            implements OutputTypeConfigurable<OUT> {
    
        @Override
        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
        }
    
            //......
    }
    
    • AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作

    StreamingFunctionUtils.snapshotFunctionState

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

    @Internal
    public final class StreamingFunctionUtils {
    
        public static void snapshotFunctionState(
                StateSnapshotContext context,
                OperatorStateBackend backend,
                Function userFunction) throws Exception {
    
            Preconditions.checkNotNull(context);
            Preconditions.checkNotNull(backend);
    
            while (true) {
    
                if (trySnapshotFunctionState(context, backend, userFunction)) {
                    break;
                }
    
                // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
                if (userFunction instanceof WrappingFunction) {
                    userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
                } else {
                    break;
                }
            }
        }
    
        private static boolean trySnapshotFunctionState(
                StateSnapshotContext context,
                OperatorStateBackend backend,
                Function userFunction) throws Exception {
    
            if (userFunction instanceof CheckpointedFunction) {
                ((CheckpointedFunction) userFunction).snapshotState(context);
    
                return true;
            }
    
            if (userFunction instanceof ListCheckpointed) {
                @SuppressWarnings("unchecked")
                List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                        snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
    
                ListState<Serializable> listState = backend.
                        getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
    
                listState.clear();
    
                if (null != partitionableState) {
                    try {
                        for (Serializable statePartition : partitionableState) {
                            listState.add(statePartition);
                        }
                    } catch (Exception e) {
                        listState.clear();
    
                        throw new Exception("Could not write partitionable state to operator " +
                            "state backend.", e);
                    }
                }
    
                return true;
            }
    
            return false;
        }
    
        //......
    }
    
    • snapshotFunctionState方法,这里执行了trySnapshotFunctionState操作,这里userFunction的类型,如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法,注意这里先clear了ListState,然后调用ListState.add方法将返回的List添加到ListState中

    小结

    • flink的CheckpointCoordinatorDeActivator在job的status为RUNNING的时候会触发CheckpointCoordinator的startCheckpointScheduler,非RUNNING的时候调用CheckpointCoordinator的stopCheckpointScheduler方法
    • CheckpointCoordinator的startCheckpointScheduler主要是注册了ScheduledTrigger任务,其run方法执行triggerCheckpoint操作,triggerCheckpoint方法在真正触发checkpoint之前会进行一系列的校验,不满足则立刻fail fast,其中可能的原因有(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS、CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS、NOT_ALL_REQUIRED_TASKS_RUNNING);满足条件的话,就是挨个遍历executions,调用Execution.triggerCheckpoint,它借助taskManagerGateway.triggerCheckpoint来通过rpc调用TaskExecutor的triggerCheckpoint方法
    • TaskExecutor的triggerCheckpoint主要是调用Task的triggerCheckpointBarrier方法,后者主要是异步执行一个runnable,里头的run方法是调用invokable.triggerCheckpoint,这里的invokable为SourceStreamTask,而它主要是调用父类StreamTask的triggerCheckpoint方法,该方法的主要逻辑在performCheckpoint操作上;performCheckpoint在isRunning为true的时候,分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()
    • CheckpointingOperation的executeCheckpointing方法会对所有的StreamOperator执行checkpointStreamOperator操作,而checkpointStreamOperator方法会调用StreamOperator的snapshotState方法;AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作
    • AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作,该操作会根据userFunction的类型调用相应的方法(如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法)

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的CheckpointScheduler

          本文链接:https://www.haomeiwen.com/subject/aebthqtx.html