美文网首页Flink源码解析
一文搞定 Flink Checkpoint Barrier 全流

一文搞定 Flink Checkpoint Barrier 全流

作者: shengjk1 | 来源:发表于2020-06-18 19:36 被阅读0次

    上文中,我们一起了解了 一文搞定 Flink 消费消息的全流程,接下来呢,我们一起来看一下 checkpoint barrier 的全流程。

    首先呢,Job 启动的时候,Flink 会 startCheckpointScheduler

    public void startCheckpointScheduler() {
            synchronized (lock) {
                if (shutdown) {
                    throw new IllegalArgumentException("Checkpoint coordinator is shut down");
                }
    
                // make sure all prior timers are cancelled
                stopCheckpointScheduler();
    
                periodicScheduling = true;
                long initialDelay = ThreadLocalRandom.current().nextLong(
                    minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
                // 定时任务
                currentPeriodicTrigger = timer.scheduleAtFixedRate(
                        new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
            }
        }
    

    通过定时任务来触发 checkpoint。
    到 Task.triggerCheckpoint

    @Override
        // trigger operator chain task trigger checkpoint
        public CompletableFuture<Acknowledge> triggerCheckpoint(
                ExecutionAttemptID executionAttemptID,
                long checkpointId,
                long checkpointTimestamp,
                CheckpointOptions checkpointOptions) {
            log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
    
            final Task task = taskSlotTable.getTask(executionAttemptID);
    
            if (task != null) {
                task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
    
                return CompletableFuture.completedFuture(Acknowledge.get());
            } else {
                final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
    
                log.debug(message);
                return FutureUtils.completedExceptionally(new CheckpointException(message));
            }
        }
    
    

    到 Task.triggerCheckpointBarrier

    // trigger operator chain trigger checkpoint  最终触发 triggerCheckpointBarrier
        public void triggerCheckpointBarrier(
            final long checkpointID,
            long checkpointTimestamp,
            final CheckpointOptions checkpointOptions) {
            
            //实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
            // source ->flatMap
            // invokable 实际上是 operator chain
            final AbstractInvokable invokable = this.invokable;
            final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
            
            if (executionState == ExecutionState.RUNNING && invokable != null) {
                
                // build a local closure
                final String taskName = taskNameWithSubtask;
                final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
                    FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
                
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        // set safety net from the task's context for checkpointing thread
                        LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
                        
                        try {
                            // invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
                            boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                            if (!success) {
                                checkpointResponder.declineCheckpoint(
                                    getJobID(), getExecutionId(), checkpointID,
                                    new CheckpointDeclineTaskNotReadyException(taskName));
                            }
                        } catch (Throwable t) {
                            if (getExecutionState() == ExecutionState.RUNNING) {
                                failExternally(new Exception(
                                    "Error while triggering checkpoint " + checkpointID + " for " +
                                        taskNameWithSubtask, t));
                            } else {
                                LOG.debug("Encountered error while triggering checkpoint {} for " +
                                        "{} ({}) while being not in state running.", checkpointID,
                                    taskNameWithSubtask, executionId, t);
                            }
                        } finally {
                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                        }
                    }
                };
                executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
            } else {
                LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
                
                // send back a message that we did not do the checkpoint
                checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
            }
        }
    

    我们以 SourceStreamTask 为例,进入

    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
            try {
                // No alignment if we inject a checkpoint
                CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
                        .setBytesBufferedInAlignment(0L)
                        .setAlignmentDurationNanos(0L);
    
                return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
            }
            catch (Exception e) {
                // propagate exceptions only if the task is still in "running" state
                if (isRunning) {
                    throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                        " for operator " + getName() + '.', e);
                } else {
                    LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                        "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
                    return false;
                }
            }
        }
    

    StreamTask.performCheckpoint

    // trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)
        private boolean performCheckpoint(
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointMetrics checkpointMetrics) throws Exception {
    
            LOG.debug("Starting checkpoint ({}) {} on task {}",
                checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
    
            synchronized (lock) {
                if (isRunning) {
                    // we can do a checkpoint
    
                    // All of the following steps happen as an atomic step from the perspective of barriers and
                    // records/watermarks/timers/callbacks.
                    // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                    // checkpoint alignments
    
                    // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                    //           The pre-barrier work should be nothing or minimal in the common case.
                    //注意,从这里开始,整个执行链路上开始出现Barrier
                    operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
    
                    // Step (2): Send the checkpoint barrier downstream
                    /*
                    发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint
                     */
                    operatorChain.broadcastCheckpointBarrier(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetaData.getTimestamp(),
                            checkpointOptions);
    
                    // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                    //           impact progress of the streaming topology
                    // 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的
                    // 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的
                    checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                    return true;
                }
                else {
                    // we cannot perform our checkpoint - let the downstream operators know that they
                    // should not wait for any input from this operator
    
                    // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                    // yet be created
                    final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                    Exception exception = null;
    
                    for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
                        try {
                            //类似于 barrier 的另一种消息
                            recordWriter.broadcastEvent(message);
                        } catch (Exception e) {
                            exception = ExceptionUtils.firstOrSuppressed(
                                new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
                                exception);
                        }
                    }
    
                    if (exception != null) {
                        throw exception;
                    }
    
                    return false;
                }
            }
        }
    

    整个 Flink Graph 首次出现 checkpoint barrier。
    需要注意的是主动触发 checkpoint 的只有 trigger operator( 在生成 ExecutionGraph 时会生成 trigger operator,ack operator,confirm operator,这些task 本质上是 operator chain ) ,trigger operator 我们可以简单的理解成 streamSource operator。
    换言之,streamSource operator 触发了 checkpoint,一直到把 checkpoint 广播到下游,最后做 checkpoint state ( StreamSource operator 的 state )。
    具体是怎么广播到下游的,其实与普通消息的传递类似,可以参考 一文搞定 Flink 消费消息的全流程

    然后下游的算子 比如 flatMap 在 OneInputStreamTask ( 以此为例 ) 中消费消息

    @Override
        protected void run() throws Exception {
            // cache processor reference on the stack, to make the code more JIT friendly
            final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
            //处理输入的消息
            while (running && inputProcessor.processInput()) {
                // all the work happens in the "processInput" method
            }
        }
    

    接下来,直接到 BarrierBuffer (当设置 checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 时 )

    @Override
        // 从 ResultSubPartition 中获取数据
        public BufferOrEvent getNextNonBlocked() throws Exception {
            while (true) {
                // process buffered BufferOrEvents before grabbing new ones
                Optional<BufferOrEvent> next;
                if (currentBuffered == null) {
                    // 如果当前有堆积的 boe,直接从 InputGate 中获取,否则从缓存中获取(通过 CachedBufferBlocker 缓存的数据)
                    // 通过 inputGate 中的 inputChannel 来获取 ResultSubPartition 中的数据
                    next = inputGate.getNextBufferOrEvent();
                }
                else {
                    next = Optional.ofNullable(currentBuffered.getNext());
                    if (!next.isPresent()) {
                        completeBufferedSequence();
                        return getNextNonBlocked();
                    }
                }
    
                if (!next.isPresent()) {
                    if (!endOfStream) {
                        // end of input stream. stream continues with the buffered data
                        endOfStream = true;
                        releaseBlocksAndResetBarriers();
                        return getNextNonBlocked();
                    }
                    else {
                        // final end of both input and buffered data
                        return null;
                    }
                }
    
                BufferOrEvent bufferOrEvent = next.get();
                if (isBlocked(bufferOrEvent.getChannelIndex())) {
                    // if the channel is blocked, we just store the BufferOrEvent
                    //  barrier 对齐
                    bufferBlocker.add(bufferOrEvent);
                    checkSizeLimit();
                }
                else if (bufferOrEvent.isBuffer()) {
                    return bufferOrEvent;
                }
                // 处理 barrier
                else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                    if (!endOfStream) {
                        // process barriers only if there is a chance of the checkpoint completing
                        //除 trigger task 外的 operator 都是在这里做的 checkpoint 只有通过 processInput 消费到才表示 barrier 经过了上游算子
                        processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                    }
                }
                else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
                    processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
                }
                else {
                    if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
                        processEndOfPartition();
                    }
                    return bufferOrEvent;
                }
            }
        }
    

    接下来就是最为关键的逻辑 处理 barrier

    // 一个 opertor 必须收到从每个 inputchannel 发过来的同一序号的 barrier 之后才能发起本节点的 checkpoint,
        //  如果有的 channel 的数据处理的快了,那该 barrier 后的数据还需要缓存起来,
        //  如果有的 inputchannel 被关闭了,那它就不会再发送 barrier 过来了
        private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
            final long barrierId = receivedBarrier.getId();
    
            // fast path for single channel cases
            if (totalNumberOfInputChannels == 1) {
                if (barrierId > currentCheckpointId) {
                    // new checkpoint
                    currentCheckpointId = barrierId;
                    // 触发 checkpoint
                    notifyCheckpoint(receivedBarrier);
                }
                return;
            }
    
            // -- general code path for multiple input channels --
    
            if (numBarriersReceived > 0) {
                // this is only true if some alignment is already progress and was not canceled
    
                if (barrierId == currentCheckpointId) {
                    // regular case
                    onBarrier(channelIndex);
                }
                else if (barrierId > currentCheckpointId) {
                    // we did not complete the current checkpoint, another started before
                    LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
                            "Skipping current checkpoint.",
                        inputGate.getOwningTaskName(),
                        barrierId,
                        currentCheckpointId);
    
                    // let the task know we are not completing this
                    notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
    
                    // abort the current checkpoint
                    releaseBlocksAndResetBarriers();
    
                    // begin a the new checkpoint
                    beginNewAlignment(barrierId, channelIndex);
                }
                else {
                    // ignore trailing barrier from an earlier checkpoint (obsolete now)
                    return;
                }
            }
            else if (barrierId > currentCheckpointId) {
                // first barrier of a new checkpoint
                beginNewAlignment(barrierId, channelIndex);
            }
            else {
                // either the current checkpoint was canceled (numBarriers == 0) or
                // this barrier is from an old subsumed checkpoint
                return;
            }
    
            // check if we have all barriers - since canceled checkpoints always have zero barriers
            // this can only happen on a non canceled checkpoint
            // barrier 对齐之后才会触发 checkpoint 
            if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
                // actually trigger checkpoint
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
                        inputGate.getOwningTaskName(),
                        receivedBarrier.getId(),
                        receivedBarrier.getTimestamp());
                }
    
                releaseBlocksAndResetBarriers();
                // 当收到全部的 barrier 之后,就会触发 notifyCheckpoint(),
                // 该方法又会调用 StreamTask 的 triggerCheckpoint ,和之前的operator是一样的
                notifyCheckpoint(receivedBarrier);
            }
        }
    

    最终 notifyCheckpoint 有会调用 StreamTask 的 performCheckpoint ,开始 flatMap 的 checkpoint barrier 一些列操作,比如广播 barrier,然后做自己的 checkpoint state。循环往复,直至最后。

    相关文章

      网友评论

        本文标题:一文搞定 Flink Checkpoint Barrier 全流

        本文链接:https://www.haomeiwen.com/subject/nxkpxktx.html