Flink 源码之StreamTask

作者: AlienPaul | 来源:发表于2021-04-02 11:17 被阅读0次

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

前言

本篇我们一起分析下Flink中流处理作业的初始化和执行逻辑。

AbstractInvokable

AbstractInvokableTaskManager中运行的所有任务的父类。所有的读取上游数据,用户数据处理逻辑(map,filter算子以及用户自己编写的processFunction等等)和发送处理过的数据到下游相关逻辑都在该类的invoke方法中得到执行。

AbstractInvokable中与任务执行相关的2个方法为:

  • invoke方法:启动任务执行的入口方法。实现类必须重写这个方法。
  • cancel方法:任务被取消或者是用户终止任务的时候被调用

它有两个实现类:

  • BatchTask:所有批处理类型Task的基类。
  • StreamTask:所有流处理类型Task的基类。

我们以流处理为重点,下面详细介绍下StreamTask这个类。

AbstractInvokable的创建

在开始分析StreamTask之前我们需要了解下它是在何处,如何被创建出来的。

翻阅Task线程的处理逻辑,不难发现它的invoke变量初始化位于TaskdoRun方法。

invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

这一行代码使用用户代码类加载器(userCodeClassLoader),调用目标类唯一参数为Environment类型的构造方法,创建出invokable对象。

private static AbstractInvokable loadAndInstantiateInvokable(
    ClassLoader classLoader, String className, Environment environment) throws Throwable {

    final Class<? extends AbstractInvokable> invokableClass;
    try {
        // 使用指定的classloader加载className对应的class,并转换为AbstractInvokable类型
        invokableClass =
            Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);
    } catch (Throwable t) {
        throw new Exception("Could not load the task's invokable class.", t);
    }

    Constructor<? extends AbstractInvokable> statelessCtor;

    try {
        // 获取构造函数
        statelessCtor = invokableClass.getConstructor(Environment.class);
    } catch (NoSuchMethodException ee) {
        throw new FlinkException("Task misses proper constructor", ee);
    }

    // instantiate the class
    try {
        //noinspection ConstantConditions  --> cannot happen
        // 传入environment变量,创建出新的对象
        return statelessCtor.newInstance(environment);
    } catch (InvocationTargetException e) {
        // directly forward exceptions from the eager initialization
        throw e.getTargetException();
    } catch (Exception e) {
        throw new FlinkException("Could not instantiate the task's invokable class.", e);
    }
}

StreamTask

StreamTask类是所有流处理任务的基类。Task由TaskManager部署和执行。Task是本地运行单元。每一个Task包含了一个或多个operator。这些operator在同一个OperatorChain中。
StreamTask任务执行生命周期包含:

  1. setInitialState:设置各个operator的初始状态。对应initializeState方法。
  2. 调用 invoke方法。

其中invoke方法包含的逻辑可细分为:

  • 创建出task相关配置,创建OperatorChain。
  • 执行operator的setup逻辑。
  • 执行task相关的初始化逻辑。
  • 加载并初始化operator的状态。
  • 调用各个operator的open方法。
  • 执行各个operator内的数据处理逻辑。
  • 关闭operator。
  • 销毁operator。
  • 任务清理操作。

下面我们从代码层面详细分析下invoke方法的处理流程。

invoke方法

本节我们分析StreamTask核心执行逻辑invoke方法。invoke方法如下所示:

@Override
public final void invoke() throws Exception {
    try {
        // 调用作业执行前相关准备逻辑
        beforeInvoke();

        // final check to exit early before starting to run
        // 如果任务被取消,抛出异常退出
        if (canceled) {
            throw new CancelTaskException();
        }

        // let the task do its work
        // 执行用户编写的task逻辑
        runMailboxLoop();

        // if this left the run() method cleanly despite the fact that this was canceled,
        // make sure the "clean shutdown" is not attempted
        // 再次检查如果任务被取消,抛出异常退出
        if (canceled) {
            throw new CancelTaskException();
        }

        // 执行调用后相关逻辑
        afterInvoke();
    } catch (Throwable invokeException) {
        failing = !canceled;
        try {
            cleanUpInvoke();
        }
        // TODO: investigate why Throwable instead of Exception is used here.
        catch (Throwable cleanUpException) {
            Throwable throwable =
                ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
            ExceptionUtils.rethrowException(throwable);
        }
        ExceptionUtils.rethrowException(invokeException);
    }
    
    // 执行invoke后清理操作
    cleanUpInvoke();
}

beforeInvoke方法

beforeInvoke方法主要为task的初始化操作,包含创建OperatorChain,读取上游数据和下游数据输出配置等。详细内容如下:

protected void beforeInvoke() throws Exception {
    disposedOperators = false;
    LOG.debug("Initializing {}.", getName());

    // 创建出OperatorChain
    // OperatorChain是JobGraph生成时的一箱优化措施
    // 将复合条件的多个StreamNode(对应数据变换操作)合并到一个chain中
    // 他们会被调度到同一个StreamTask中执行
    operatorChain = new OperatorChain<>(this, recordWriter);
    // 获取OperatorChain中第一个operator
    mainOperator = operatorChain.getMainOperator();

    // task specific initialization
    // 执行task专属的初始化工作
    // 这个是抽象方法
    // 具体逻辑需要在子类中实现
    init();

    // save the work of reloading state, etc, if the task is already canceled
    if (canceled) {
        throw new CancelTaskException();
    }

    // -------- Invoke --------
    LOG.debug("Invoking {}", getName());

    // we need to make sure that any triggers scheduled in open() cannot be
    // executed before all operators are opened
    // task动作必须在StreamTaskActionExecutor中执行,防止出现并发执行问题,影响checkpoint
    // 该executor实际为StreamTaskActionExecutor.IMMEDIATE,即在当前线程直接运行
    actionExecutor.runThrowing(
        () -> {
            // 创建SequentialChannelStateReader,用于读取checkpoint时保存的channel状态
            SequentialChannelStateReader reader =
                getEnvironment()
                .getTaskStateManager()
                .getSequentialChannelStateReader();
            // 获取ResultPartitionWriter状态
            reader.readOutputData(
                getEnvironment().getAllWriters(),
                !configuration.isGraphContainingLoops());

            // 初始化OperatorChain中所有的operator
            // 调用他们的initializeState(初始化状态)和open(包含初始化动作)方法
            operatorChain.initializeStateAndOpenOperators(
                createStreamTaskStateInitializer());

            channelIOExecutor.execute(
                () -> {
                    try {
                        // 获取InputGate状态
                        reader.readInputData(getEnvironment().getAllInputGates());
                    } catch (Exception e) {
                        asyncExceptionHandler.handleAsyncException(
                            "Unable to read channel state", e);
                    }
                });

            for (InputGate inputGate : getEnvironment().getAllInputGates()) {
                // 在inputGate状态被读取之后执行
                inputGate
                    .getStateConsumedFuture()
                    .thenRun(
                    () ->
                    // 在task线程中执行
                    mainMailboxExecutor.execute(
                        // 执行请求partition方法
                        inputGate::requestPartitions,
                        "Input gate request partitions"));
            }
        });
    // 水池状态为正在执行
    isRunning = true;
}

runMailboxLoop方法

runMailboxLoop方法启动task的数据输入和处理逻辑:

public void runMailboxLoop() throws Exception {
    mailboxProcessor.runMailboxLoop();
}

MailBoxProcessorStreamTask的构造函数中创建出来:

this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);

mailboxProcessor.runMailboxLoop()方法可以理解为在actionExecutor线程池执行processInput方法。

processInput方法从上游(StreamTaskNetworkInputInputGate)读取数据。这部分逻辑参见Flink 源码之节点间通信

afterInvoke

afterInvoke方法内容如下,概括起来为task执行完毕后的清理工作,关闭operator等。

protected void afterInvoke() throws Exception {
    LOG.debug("Finished task {}", getName());
    getCompletionFuture().exceptionally(unused -> null).join();

    final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();

    // close all operators in a chain effect way
    // 关闭OperatorChain中所有的operator
    // 从前向后依次调用各个operator的close方法
    operatorChain.closeOperators(actionExecutor);

    // make sure no further checkpoint and notification actions happen.
    // at the same time, this makes sure that during any "regular" exit where still
    actionExecutor.runThrowing(
        () -> {

            // make sure no new timers can come
            // 停止timer服务
            FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);

            // let mailbox execution reject all new letters from this point
            // 准备关闭mailboxProcessor,不再接受新的事件
            mailboxProcessor.prepareClose();

            // only set the StreamTask to not running after all operators have been closed!
            // See FLINK-7430
            // 设置task状态为停止
            isRunning = false;
        });
    // processes the remaining mails; no new mails can be enqueued
    // 处理积压的事件
    mailboxProcessor.drain();

    // make sure all timers finish
    // 等待所有的time都停止
    timersFinishedFuture.get();

    LOG.debug("Closed operators for task {}", getName());

    // make sure all buffered data is flushed
    // 处理掉buffer中的所有数据
    operatorChain.flushOutputs();

    // make an attempt to dispose the operators such that failures in the dispose call
    // still let the computation fail
    // 依次废弃掉OperatorChain中的所有operator(顺序为从头到尾)
    disposeAllOperators();
}

StreamTask的子类

StreamTask是所有流处理计算任务的父类,它本身是一个抽象类。为了处理不同类型的StreamOperatorStreamTask有多种不同的实现。几个典型的实现如下:

  • OneInputStreamTask:处理OneInputStreamOperator,即只有一个输入流的StreamOperator
  • TwoInputStreamTask:处理TwoInputStreamOperator,具有2个输入流。
  • MultipleInputStreamTask:处理MultipleInputStreamOperator,具有多个输入流。
  • SourceStreamTask:处理StreamSource,即数据源。

接下来我们重点关注这些类实现的抽象方法。

OneInputStreamTask的init方法

它的init方法主要流程为创建网络输入与输出,创建inputProcessor用于从网络输入读取数据,反序列化之后传递给网络输出。最后初始化数据流监控。代码和分析如下:

@Override
public void init() throws Exception {
    // 获取流作业配置
    StreamConfig configuration = getConfiguration();
    // 获取网络输入流数量
    int numberOfInputs = configuration.getNumberOfNetworkInputs();

    if (numberOfInputs > 0) {
        // 创建一个CheckpointedInputGate
        // 该类型InputGate拥有一个CheckpointBarrierHandler,用来处理接收到的CheckpointBarrier
        CheckpointedInputGate inputGate = createCheckpointedInputGate();
        // 监控相关,设置流入数据条数计数器
        Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
        // 创建StreamTaskNetworkOutput
        // 发送反序列化后的数据给task处理流程
        DataOutput<IN> output = createDataOutput(numRecordsIn);
        // 创建StreamTaskNetworkInput
        // 包装了CheckpointedInputGate,从中读取网络接收到的原始数据并发给反序列化器
        StreamTaskInput<IN> input = createTaskInput(inputGate);

        // 读取输入流配置
        StreamConfig.InputConfig[] inputConfigs =
            configuration.getInputs(getUserCodeClassLoader());
        StreamConfig.InputConfig inputConfig = inputConfigs[0];
        // 如果要求对数据排序
        // 含义为数据按照key字段分组
        // 在一段时间内只会给task提供同一分组的数据
        // 不同组的数据不会频繁交替出现
        if (requiresSorting(inputConfig)) {
            checkState(
                !configuration.isCheckpointingEnabled(),
                "Checkpointing is not allowed with sorted inputs.");
            input = wrapWithSorted(input);
        }

        // 注册流入数据条数计数器监控
        getEnvironment()
            .getMetricGroup()
            .getIOMetricGroup()
            .reuseRecordsInputCounter(numRecordsIn);

        // 创建inputProcessor
        // 从网络读取数据,反序列化后给output,然后把反序列化后的数据交给OperatorChain
        inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain);
    }
    // 创建watermark监控
    mainOperator
        .getMetricGroup()
        .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge);
    // wrap watermark gauge since registered metrics must be unique
    getEnvironment()
        .getMetricGroup()
        .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue);
}

其中创建CheckpointedInputGate的过程在 Flink 源码之分布式快照 有介绍,请大家查阅。

TwoInputStreamTask的init方法

它的初始化方法和OneInputStreamTask的类似,只不过需要创建两个InputGateTwoInputStreamTask对应CoOperator,即有两个输入流的operator(比如CoFlatmap)。

@Override
public void init() throws Exception {
    StreamConfig configuration = getConfiguration();
    ClassLoader userClassLoader = getUserCodeClassLoader();

    int numberOfInputs = configuration.getNumberOfNetworkInputs();

    ArrayList<IndexedInputGate> inputList1 = new ArrayList<>();
    ArrayList<IndexedInputGate> inputList2 = new ArrayList<>();

    List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);

    for (int i = 0; i < numberOfInputs; i++) {
        int inputType = inEdges.get(i).getTypeNumber();
        IndexedInputGate reader = getEnvironment().getInputGate(i);
        switch (inputType) {
            case 1:
                // 如果是输入流1,加入到inputList1中
                inputList1.add(reader);
                break;
            case 2:
                // 如果是输入流2,加入到inputList2中
                inputList2.add(reader);
                break;
            default:
                throw new RuntimeException("Invalid input type number: " + inputType);
        }
    }

    // 创建CheckpointedInputGate,包装了UnionInputGate
    // 包装了多个InputGate,ID相同的channel会被合并
    // 这里创建出两个UnionInputGate,每个UnionInputGate合并了多个inputType相同的InputGate
    // 最后根据这个InputGate,创建出StreamTwoInputProcessor
    createInputProcessor(
        inputList1, inputList2, gateIndex -> inEdges.get(gateIndex).getPartitioner());

    // 监控相关部分,这里省略
    // ...
}

MultipleInputStreamTask和上面的逻辑类似,不再赘述。

SourceStreamTask的init方法

@Override
protected void init() {
    // we check if the source is actually inducing the checkpoints, rather
    // than the trigger
    // 获取数据源数据产生逻辑SourceFunction
    SourceFunction<?> source = mainOperator.getUserFunction();
    // 如果source实现了这个接口,说明接收到CheckpointCoordinator发来的触发checkpoint消息之时source不触发checkpoint
    // checkpoint的触发由输入数据控制
    if (source instanceof ExternallyInducedSource) {
        externallyInducedCheckpoints = true;

        // 创建checkpoint触发钩子
        ExternallyInducedSource.CheckpointTrigger triggerHook =
            new ExternallyInducedSource.CheckpointTrigger() {

            @Override
            public void triggerCheckpoint(long checkpointId) throws FlinkException {
                // TODO - we need to see how to derive those. We should probably not
                // encode this in the
                // TODO -   source's trigger message, but do a handshake in this task
                // between the trigger
                // TODO -   message from the master, and the source's trigger
                // notification
                final CheckpointOptions checkpointOptions =
                    CheckpointOptions.forConfig(
                    CheckpointType.CHECKPOINT,
                    CheckpointStorageLocationReference.getDefault(),
                    configuration.isExactlyOnceCheckpointMode(),
                    configuration.isUnalignedCheckpointsEnabled(),
                    configuration.getAlignmentTimeout());
                final long timestamp = System.currentTimeMillis();

                final CheckpointMetaData checkpointMetaData =
                    new CheckpointMetaData(checkpointId, timestamp);

                try {
                    // 调用StreamTask的异步触发checkpoint方法
                    SourceStreamTask.super
                        .triggerCheckpointAsync(
                        checkpointMetaData, checkpointOptions)
                        .get();
                } catch (RuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    throw new FlinkException(e.getMessage(), e);
                }
            }
        };

        ((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
    }
    // 配置checkpoint启动延迟时间监控
    getEnvironment()
        .getMetricGroup()
        .getIOMetricGroup()
        .gauge(
        MetricNames.CHECKPOINT_START_DELAY_TIME,
        this::getAsyncCheckpointStartDelayNanos);
}

StreamTask从上游获取数据

StreamTask从上游获取数据的调用链为:

  • StreamTask.processInput
  • inputProcessor.processInput
  • StreamTaskNetworkInput.emitNext
  • inputGate.pollNext
  • inputChannel.getNextBuffer

StreamTask通过InputGate从上游其他Task获取到数据。每个InputGate包含一个或多个InputChannel,根据数据是否走网络通信,这些InputChannel分为RemoteInputChannelLocalInputChannel。其中RemoteInputChannel使用Netty通过网络从上游task的ResultSubPartition获取数据,适用与本task和上游task运行在不同集群节点的情况。和它相反的是LocalInputChannel,适用于本task和上游task运行在同一节点的情况,从上游task获取数据不需要走网络通信。

这部分逻辑的详细分析,参见 Flink 源码之节点间通信

数据传递给OperatorChain

这一段逻辑我们从StreamTaskNetworkInputprocessElement方法开始分析。

StreamTaskprocessInput方法为处理数据逻辑的入口。这个方法调用了StreamOneInputProcessor的同名方法,命令StreamTaskNetworkInput一直循环不停的从InputGate中获取数据。对于获取到的数据,需要先交给反序列化器,将二进制数据反序列化为StreamRecord对象。接着交给processElement方法处理。

上面逻辑的分析请参见 Flink 源码之节点间通信 读取数据章节。

下面是processElement方法。该方法位于AbstractStreamTaskNetworkInput。参数中的output实际上就是StreamTaskNetworkOutput`对象。

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
    // 首先判断元素的类型,可能是数据,watermark,延迟标记或者是流状态
    if (recordOrMark.isRecord()) {
        output.emitRecord(recordOrMark.asRecord());
    } else if (recordOrMark.isWatermark()) {
        statusWatermarkValve.inputWatermark(
            recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
    } else if (recordOrMark.isLatencyMarker()) {
        output.emitLatencyMarker(recordOrMark.asLatencyMarker());
    } else if (recordOrMark.isStreamStatus()) {
        statusWatermarkValve.inputStreamStatus(
            recordOrMark.asStreamStatus(),
            flattenedChannelIndices.get(lastChannel),
            output);
    } else {
        throw new UnsupportedOperationException("Unknown type of StreamElement");
    }
}

StreamTaskNetworkOutput接收反序列化处理过的数据,发送给OperatorChain的第一个operator。

private static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> {

    // 创建的时候传入的是OperatorChain的mainOperator,即第一个operator
    private final OneInputStreamOperator<IN, ?> operator;

    private final WatermarkGauge watermarkGauge;
    private final Counter numRecordsIn;

    private StreamTaskNetworkOutput(
        OneInputStreamOperator<IN, ?> operator,
        StreamStatusMaintainer streamStatusMaintainer,
        WatermarkGauge watermarkGauge,
        Counter numRecordsIn) {
        super(streamStatusMaintainer);

        this.operator = checkNotNull(operator);
        this.watermarkGauge = checkNotNull(watermarkGauge);
        this.numRecordsIn = checkNotNull(numRecordsIn);
    }

    // 发送数据
    @Override
    public void emitRecord(StreamRecord<IN> record) throws Exception {
        numRecordsIn.inc();
        operator.setKeyContextElement1(record);
        // 调用processElement方法,处理数据
        operator.processElement(record);
    }

    // 发送watermark
    @Override
    public void emitWatermark(Watermark watermark) throws Exception {
        watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
        // 调用processWatermark方法,处理watermark
        operator.processWatermark(watermark);
    }

    // 发送延迟标记,被用于统计数据在整个Flink处理流程中的耗时
    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        operator.processLatencyMarker(latencyMarker);
    }
}

OperatorChain的逻辑在后续博客中单独分析。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

相关文章

网友评论

    本文标题:Flink 源码之StreamTask

    本文链接:https://www.haomeiwen.com/subject/qrbfkltx.html