美文网首页数客联盟Flink
Flink Checkpoint机制解析-代码走读

Flink Checkpoint机制解析-代码走读

作者: biggeng | 来源:发表于2019-03-18 10:04 被阅读6次

    Flink的Checkpoint机制是Flink容错能力的基本保证,能够对流处理运行时的状态进行保存,当故障发生时,能够备份的状态中还原。例如,当Flink读取kafka时,将消费的kafka offset保存下来,如果任务失败,可以从上次消费的offset之后重新消费。

    Flink的checkpoint从以下几方面着手理解。

    Barrier

    Barrier是一个轻量级的数据被按一定的规则(调度)插入到原始数据流中,这个数据不会影响原有数据处理的性能,不会改变原始数据的顺序。Barrier将数据分割成一段一段的,有点类似于Spark Streaming的micro batch中的批次数据。


    Barrier.png

    Barrier随着数据一起在各Task中流动,当Operator收到一个barrier时,会认为此barrier之前的所有数据应该已经得到了处理,这时候就会触发checkpoint。
    如果一个Operator有多个并发的输入流,那么当它收到一个checkpoint的barrier时,需要等待其他所有的该checkpoint对应的额barrier到达,再进行处理,这个barrier对齐的步骤如下。

    • 只要有operator从上游接收到一条barrier n,此时,该operator就不能处理这条流barrier以后的数据,直到该operator收到其他所有上游的barrier n。

    • 此时上报barrier n的流暂时不做任何处理。从这些流里读到的数据也不被处理,而是被放置到input buffer中缓存。

    • 直到最后一个上游的barrier n到达,operator会发送barrier n给下游。

    • 之后,operator恢复从所有的上游中处理数据,在上游流数据处理之前先将input buffer中的数据处理。
      整个过程如下图所示。


      barrier.png

    Barrier的产生

    Flink的checkpoint是由JobMaster发起的,以一定的周期触发Source Task产生barrier。

    JobMastert周期性触发checkpoint。

    在CheckpointCoordinator类中startCheckpointScheduler() 方法

        public void startCheckpointScheduler() {
            synchronized (lock) {
                if (shutdown) {
                    throw new IllegalArgumentException("Checkpoint coordinator is shut down");
                }
    
                // make sure all prior timers are cancelled
                stopCheckpointScheduler();
                //以特定的周期 baseInterval 触发checkpoint
                periodicScheduling = true;
                long initialDelay = ThreadLocalRandom.current().nextLong(
                    minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
                currentPeriodicTrigger = timer.scheduleAtFixedRate(
                        new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
            }
        }
    
    

    顺着ScheduledTrigger一直进入到jobMaster触发checkpoint的方法triggerCheckpoint. 在经过一系列的设置之后,该方法会生成一个唯一的checkpoint ID,并创建pending的checkpoint,调用rpc方法execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 触发Task侧生成barrier,进行checkpoint.

    checkpointID = checkpointIdCounter.getAndIncrement();
    
    final PendingCheckpoint checkpoint = new PendingCheckpoint(
                    job,
                    checkpointID,
                    timestamp,
                    ackTasks,
                    props,
                    checkpointStorageLocation,
                    executor);
    
    // send the messages to the tasks that trigger their checkpoint
            for (Execution execution: executions) {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
    
    
        public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
            final LogicalSlot slot = assignedResource;
    
            if (slot != null) {
                final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    //调用TaskManager RPC taskManagerGateway.triggerCheckpoint
                taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
            } else {
                LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                    "no longer running.");
            }
        }
    
    Source收到指令后生成barrier

    TaskManager 接收到触发checkpoint的RPC后,在Source Task中触发生成checkpoint barrier, 在triggerCheckpointBarrier中会创建另一个线程专门做生成barrier的事情。

    public void triggerCheckpointBarrier(
                final long checkpointID,
                long checkpointTimestamp,
                final CheckpointOptions checkpointOptions) {
    
        Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        // set safety net from the task's context for checkpointing thread
                        LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
    
                        try {
                            boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                            if (!success) {
                                checkpointResponder.declineCheckpoint(
                                        getJobID(), getExecutionId(), checkpointID,
                                        new CheckpointDeclineTaskNotReadyException(taskName));
                            }
                        }
    
    

    在Source Task中真正执行的方法是StreamTask中的performCheckpoint,在此方法中,会进行两件事:首先生成携带checkpoint ID的barrier,并将此barrier发送到所有的下游。然后处理本Task的状态保存。这样,在整个流处理中就有了barrier传递。

        private boolean performCheckpoint(
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointMetrics checkpointMetrics) throws Exception {
    
            LOG.debug("Starting checkpoint ({}) {} on task {}",
                checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
    
            synchronized (lock) {
                if (isRunning) {
                    // we can do a checkpoint
    
                    // All of the following steps happen as an atomic step from the perspective of barriers and
                    // records/watermarks/timers/callbacks.
                    // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                    // checkpoint alignments
    
                    // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                    //           The pre-barrier work should be nothing or minimal in the common case.
                    operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
    
                    // Step (2): Send the checkpoint barrier downstream
                    operatorChain.broadcastCheckpointBarrier(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetaData.getTimestamp(),
                            checkpointOptions);
    
                    // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                    //           impact progress of the streaming topology
                    checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                    return true;
                }
    

    Barrier传递

    上面讲到对于Source Task,会根据JobMaster的指令周期性的在原始数据中插入barrier,并将barrier传递到下游Operator。
    对于非Source Task,在处理数据中,并不是周期性触发checkpoint,而是当遇到Barrier数据时,触发一次checkpoint。
    具体到代码中,是由BarrierBuffer中的getNextNonBlocked触发。

        public BufferOrEvent getNextNonBlocked() throws Exception {
            while (true) {
                // process buffered BufferOrEvents before grabbing new ones
                Optional<BufferOrEvent> next;
                if (currentBuffered == null) {
                    next = inputGate.getNextBufferOrEvent();
                }
                else {
                    next = Optional.ofNullable(currentBuffered.getNext());
                    if (!next.isPresent()) {
                        completeBufferedSequence();
                        return getNextNonBlocked();
                    }
                }
    
                if (!next.isPresent()) {
                    if (!endOfStream) {
                        // end of input stream. stream continues with the buffered data
                        endOfStream = true;
                        releaseBlocksAndResetBarriers();
                        return getNextNonBlocked();
                    }
                    else {
                        // final end of both input and buffered data
                        return null;
                    }
                }
    
                BufferOrEvent bufferOrEvent = next.get();
                if (isBlocked(bufferOrEvent.getChannelIndex())) {
                    // if the channel is blocked we, we just store the BufferOrEvent
                    bufferBlocker.add(bufferOrEvent);
                    checkSizeLimit();
                }
                else if (bufferOrEvent.isBuffer()) {
                    return bufferOrEvent;
                }
                else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                    if (!endOfStream) {
                        // process barriers only if there is a chance of the checkpoint completing
                        processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                    }
                }
                else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
                    processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
                }
                else {
                    if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
                        processEndOfPartition();
                    }
                    return bufferOrEvent;
                }
            }
        }
    

    如果当前流尚未结束,则在方法processBarrier中处理该barrier,processBarrier会根据该Task是否有多个输入源判断是否需要对齐barrier。如果可以进行barrier了,则会调用notifyCheckpoint触发checkpoint,该方法会走到triggerCheckpointOnBarrier,后续过程和Source Task一致。

    Operator checkpoint

    上述讲到当Task执行checkpoint时,首先会生成该checkpoint的barrier广播出去,然后再执行该Task的checkpoint. 通过executeCheckpointing方法调用operator的snapshotState进行状态保存。不同的operator根据自己的需要实现snapshotState方法。
    例如Flink提供的kafka consumer operator, KafkaConsumerBase的snapshotState就保存了当前Topic各partition消费到的offset.

        public final void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (!running) {
                LOG.debug("snapshotState() called on closed source");
            } else {
                unionOffsetStates.clear();
    
                final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
                if (fetcher == null) {
                    // the fetcher has not yet been initialized, which means we need to return the
                    // originally restored offsets or the assigned partitions
                    for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                        unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                    }
    
                    if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                        // the map cannot be asynchronously updated, because only one checkpoint call can happen
                        // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                        pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                    }
                } else {
                    HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
    
                    if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                        // the map cannot be asynchronously updated, because only one checkpoint call can happen
                        // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                    }
    
                    for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                        unionOffsetStates.add(
                                Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                    }
                }
    
                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // truncate the map of pending offsets to commit, to prevent infinite growth
                    while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                        pendingOffsetsToCommit.remove(0);
                    }
                }
            }
        }
    

    pending checkpoint到complete

    当一个operator完成了checkpoint时,会向job master报告已经完成了,job master收到该operator报告的完成信息,会将此operator从未完成checkpoint移到已完成,当所有的operator都上报了完成信息时,job master会将此checkpoint从pending状态改变未complete状态。

    Task侧:

    executeCheckpointing -> asyncCheckpointRunnable -> reportCompletedSnapshotStates -> taskStateManager.reportTaskStateSnapshots -> TaskStateManagerImpl 最终会调用

        public void reportTaskStateSnapshots(
            @Nonnull CheckpointMetaData checkpointMetaData,
            @Nonnull CheckpointMetrics checkpointMetrics,
            @Nullable TaskStateSnapshot acknowledgedState,
            @Nullable TaskStateSnapshot localState) {
    
            long checkpointId = checkpointMetaData.getCheckpointId();
    
            localStateStore.storeLocalState(checkpointId, localState);
    
            checkpointResponder.acknowledgeCheckpoint(
                jobId,
                executionAttemptID,
                checkpointId,
                checkpointMetrics,
                acknowledgedState);
        }
    

    通过actor通知job master。

    Job master侧:

    acknowledgeCheckpoint -> checkpointCoordinator.receiveAcknowledgeMessage(ackMessage) -> checkpoint.acknowledgeTask -> completePendingCheckpoint(checkpoint);

        private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
            final long checkpointId = pendingCheckpoint.getCheckpointId();
            final CompletedCheckpoint completedCheckpoint;
    
            // As a first step to complete the checkpoint, we register its state with the registry
            Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
            sharedStateRegistry.registerAll(operatorStates.values());
    
            try {
                try {
                    completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
                }
                catch (Exception e1) {
                    // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
                    if (!pendingCheckpoint.isDiscarded()) {
                        pendingCheckpoint.abortError(e1);
                    }
    
                    throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);
                }
    
                // the pending checkpoint must be discarded after the finalization
                Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
    
                try {
                    completedCheckpointStore.addCheckpoint(completedCheckpoint);
                } catch (Exception exception) {
                    // we failed to store the completed checkpoint. Let's clean up
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                completedCheckpoint.discardOnFailedStoring();
                            } catch (Throwable t) {
                                LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
                            }
                        }
                    });
    
                    throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
                }
            } finally {
                pendingCheckpoints.remove(checkpointId);
    
                triggerQueuedRequests();
            }
    
            rememberRecentCheckpointId(checkpointId);
    
            // drop those pending checkpoints that are at prior to the completed one
            dropSubsumedCheckpoints(checkpointId);
    
            // record the time when this was completed, to calculate
            // the 'min delay between checkpoints'
            lastCheckpointCompletionNanos = System.nanoTime();
    
            LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
                completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
    
            if (LOG.isDebugEnabled()) {
                StringBuilder builder = new StringBuilder();
                builder.append("Checkpoint state: ");
                for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
                    builder.append(state);
                    builder.append(", ");
                }
                // Remove last two chars ", "
                builder.setLength(builder.length() - 2);
    
                LOG.debug(builder.toString());
            }
    
            // send the "notify complete" call to all vertices
            final long timestamp = completedCheckpoint.getTimestamp();
    
            for (ExecutionVertex ev : tasksToCommitTo) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) {
                    ee.notifyCheckpointComplete(checkpointId, timestamp);
                }
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:Flink Checkpoint机制解析-代码走读

        本文链接:https://www.haomeiwen.com/subject/myokmqtx.html