Flink 源码之OperatorChain

作者: AlienPaul | 来源:发表于2021-05-18 18:12 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    前言

    OperatorChain是Flink中一个很重要的优化措施,能够将尽可能多的满足条件的数据处理操作在一个slot中串联完成,从而最小化线程执行上下文切换和网络通信,提高流计算系统性能。

    Flink判断哪些操作可以纳入同一个chain的逻辑位于JobGraph生成过程中,详情请参见:Flink 源码之JobGraph生成

    名词解释

    • StreamEdge:为StreamGraph拓扑图中的元素。StreamGraphStreamNodeStreamEdge构成DAG图。详情参见 Flink 源码之StreamGraph生成
    • RecordWriterOutput:一种operator输出(Output)类型,用于将数据通过RecordWriter输出到ResultPartition
    • ChainingOutput:和RecordWriterOutput类似,也是一种operator输出类型,只不过ChainingOutput是在OperatorChain中专用的。它作为桥梁,将上游operator处理过得数据交给下游的operator。后面章节有详细分析。
    • TypeSerializer:用于从DataInputView读取字节数组并反序列化为T类型,或者是将T类型序列化为DataOutputView。其中DataInputViewDataOutputView均直接操纵字节数组,这些字节数组的实际存储由MemorySegment支撑。
    • StreamOperatorWrapper:用于包装StreamOperatorOperatorChain专用。它具有两个指针,分别指向前后两个operator,形成一个双向链表。Chain的概念由此而来。

    接下来我们从OperatorChain的构造方法开始展开分析。

    构造方法

    OperatorChainStreamTaskbeforeInvoke方法构建出来(参见 Flink 源码之StreamTask)。获取chain到一起的operator(为OperatorChain中的mainOperator,如何生成chain到一起的operator的具体过程后面有分析),有数据到来的时候数据便交由mainOperator来处理。

    OperatorChain的构造函数和分析如下所示:

    public OperatorChain(
        StreamTask<OUT, OP> containingTask,
        RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
    
        // 创建发送和接收OperatorEvent的Dispatcher
        this.operatorEventDispatcher =
            new OperatorEventDispatcherImpl(
            containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
            containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
    
        // 获取用户代码类加载器
        final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        // 获取任务的配置
        final StreamConfig configuration = containingTask.getConfiguration();
    
        // 获取StreamTask的StreamOperator工厂
        StreamOperatorFactory<OUT> operatorFactory =
            configuration.getStreamOperatorFactory(userCodeClassloader);
    
        // we read the chained configs, and the order of record writer registrations by output name
        // 获取OperatorChain中所有StreamOperator对应的StreamConfig,map的key为vertexID
        Map<Integer, StreamConfig> chainedConfigs =
            configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
    
        // create the final output stream writers
        // we iterate through all the out edges from this job vertex and create a stream output
        // 按照数据流顺序,获取各个任务的StreamEdge
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
            new HashMap<>(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
    
        // from here on, we need to make sure that the output writers are shut down again on failure
        boolean success = false;
        try {
            // 创建链式输出
            // 用于初始化streamOutputMap变量
            // streamOutputMap保存了每步操作的StreamEdge和output的对应关系
            createChainOutputs(
                outEdgesInOrder,
                recordWriterDelegate,
                chainedConfigs,
                containingTask,
                streamOutputMap);
    
            // we create the chain of operators and grab the collector that leads into the chain
            // 创建包含所有operatorWrapper的集合
            List<StreamOperatorWrapper<?, ?>> allOpWrappers =
                new ArrayList<>(chainedConfigs.size());
            // 创建mainOperator对应的output
            // OperatorChain的入口Operator为mainOperator
            // 这个operator通过ChainingOutput按照数据流向顺序串联了OperatorChain中的所有operator
            this.mainOperatorOutput =
                createOutputCollector(
                containingTask,
                configuration,
                chainedConfigs,
                userCodeClassloader,
                streamOutputMap,
                allOpWrappers,
                containingTask.getMailboxExecutorFactory());
    
            if (operatorFactory != null) {
                // 创建mainOperator和时间服务
                Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
                    StreamOperatorFactoryUtil.createOperator(
                    operatorFactory,
                    containingTask,
                    configuration,
                    mainOperatorOutput,
                    operatorEventDispatcher);
    
                OP mainOperator = mainOperatorAndTimeService.f0;
                // 设置Watermark监控项
                mainOperator
                    .getMetricGroup()
                    .gauge(
                    MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
                    mainOperatorOutput.getWatermarkGauge());
                // 创建mainOperatorWrapper
                this.mainOperatorWrapper =
                    createOperatorWrapper(
                    mainOperator,
                    containingTask,
                    configuration,
                    mainOperatorAndTimeService.f1,
                    true);
    
                // add main operator to end of chain
                // 将mainOperatorWrapper添加到chain的最后
                allOpWrappers.add(mainOperatorWrapper);
    
                // createOutputCollector方法将各个operator包装到operatorWrapper中
                // 按照数据流相反的顺序加入到allOpWrappers集合
                // 所以,尾部的operatorWrapper就是index为0的元素
                this.tailOperatorWrapper = allOpWrappers.get(0);
            } else {
                // 如果OperatorFactory为null
                checkState(allOpWrappers.size() == 0);
                this.mainOperatorWrapper = null;
                this.tailOperatorWrapper = null;
            }
    
            // 创建chain数据源
            this.chainedSources =
                createChainedSources(
                containingTask,
                configuration.getInputs(userCodeClassloader),
                chainedConfigs,
                userCodeClassloader,
                allOpWrappers);
    
            this.numOperators = allOpWrappers.size();
    
            // 将所有的StreamOperatorWrapper按照从上游到下游的顺序,形成双向链表
            firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);
    
            success = true;
        } finally {
            // make sure we clean up after ourselves in case of a failure after acquiring
            // the first resources
            if (!success) {
                for (RecordWriterOutput<?> output : this.streamOutputs) {
                    if (output != null) {
                        output.close();
                    }
                }
            }
        }
    }
    

    createChainOutputs

    createChainOutputs方法作用为生成并保存每个StreamEdgestreamOutput的对应关系。代码如下所示:

    private void createChainOutputs(
        List<StreamEdge> outEdgesInOrder,
        RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
        Map<Integer, StreamConfig> chainedConfigs,
        StreamTask<OUT, OP> containingTask,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
        // 遍历已排序的StreamEdge
        for (int i = 0; i < outEdgesInOrder.size(); i++) {
            StreamEdge outEdge = outEdgesInOrder.get(i);
    
            // 创建streamOutput
            RecordWriterOutput<?> streamOutput =
                createStreamOutput(
                recordWriterDelegate.getRecordWriter(i),
                outEdge,
                chainedConfigs.get(outEdge.getSourceId()),
                containingTask.getEnvironment());
    
            // 更新streamOutput数组
            this.streamOutputs[i] = streamOutput;
            // 保存每个StreamEdge和streamOutput的对应关系
            streamOutputMap.put(outEdge, streamOutput);
        }
    }
    

    接着继续分析createStreamOutput方法:

    private RecordWriterOutput<OUT> createStreamOutput(
        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
        StreamEdge edge,
        StreamConfig upStreamConfig,
        Environment taskEnvironment) {
        // 获取Output标签,如果没有配置旁路输出,没有OutputTag
        OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
    
        TypeSerializer outSerializer = null;
    
        // 根据是否为旁路输出,获取对应的类型序列化器
        if (edge.getOutputTag() != null) {
            // side output
            outSerializer =
                upStreamConfig.getTypeSerializerSideOut(
                edge.getOutputTag(),
                taskEnvironment.getUserCodeClassLoader().asClassLoader());
        } else {
            // main output
            outSerializer =
                upStreamConfig.getTypeSerializerOut(
                taskEnvironment.getUserCodeClassLoader().asClassLoader());
        }
    
        // 返回创建的RecordWriterOutput
        return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
    }
    

    createOutputCollector

    这个方法是chain的主要逻辑所在。我们重点分析。createOutputCollector方法分析如下:

    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
        StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
        MailboxExecutorFactory mailboxExecutorFactory) {
        List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
            new ArrayList<>(4);
    
        // create collectors for the network outputs
        // 遍历非链式StreamEdge,非链式的StreamEdge输出需要走网络连接
        // 因此生成的Output类型为RecordWriterOutput
        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
            @SuppressWarnings("unchecked")
            // 从上一步createChainOutputs方法返回的streamOutputs中获取StreamEdge对应的output
            RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
            // 加入到allOutputs集合中
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }
    
        // Create collectors for the chained outputs
        // 获取该Operator对应的所有chained StreamEdge
        // 如果这个Operator具有多个chained的下游,这里会获取到多个outEdge
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            // 获取这个outputEdge对应的StreamConfig
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
    
            // 根据StreamEdge生成streamOutput,为WatermarkGaugeExposingOutput类型
            // WatermarkGaugeExposingOutput包装了Output和一个监控watermark的仪表盘
            // 如果存在可以chain的operator,需要递归调用,将下游与上游链接起来
            WatermarkGaugeExposingOutput<StreamRecord<T>> output =
                createOperatorChain(
                containingTask,
                chainedOpConfig,
                chainedConfigs,
                userCodeClassloader,
                streamOutputs,
                allOperatorWrappers,
                outputEdge.getOutputTag(),
                mailboxExecutorFactory);
            // 将其加入allOutputs集合中
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }
    
        // 如果输出只有一个,返回这个输出
        if (allOutputs.size() == 1) {
            return allOutputs.get(0).f0;
        } else {
            // send to N outputs. Note that this includes the special case
            // of sending to zero outputs
            // 如果有多个输出,将allOutputs转换为Output类型数组
            @SuppressWarnings({"unchecked"})
            Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
            for (int i = 0; i < allOutputs.size(); i++) {
                asArray[i] = allOutputs.get(i).f0;
            }
    
            // This is the inverse of creating the normal ChainingOutput.
            // If the chaining output does not copy we need to copy in the broadcast output,
            // otherwise multi-chaining would not work correctly.
            // 根据配置中对象是否可重用,创建不同的OutputCollector
            if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                // 在StreamRecord发往下游的时候实际发送的是StreamRecord的浅拷贝
                // 避免使用深拷贝,从而提高性能,但是需要注意如果开启ObjectReuse
                // 避免在下游改变流数据元素的值,否则会出现线程安全问题
                return new CopyingBroadcastingOutputCollector<>(asArray, this);
            } else {
                return new BroadcastingOutputCollector<>(asArray, this);
            }
        }
    }
    

    然后需要分析createOperatorChain方法。它将OperatorChain中所有的Operator包装为StreamOperatorWrapper类型,按照数据流反方向存入allOperatorWrappers集合。根据operator的顺序,依次生成ChainingOutput,将各个operator数据流串联起来。该方法内容如下:

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(
        StreamTask<OUT, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
        OutputTag<IN> outputTag,
        MailboxExecutorFactory mailboxExecutorFactory) {
        // create the output that the operator writes to first. this may recursively create more
        // operators
        // 这里的operatorConfig为前一个方法中每次遍历的chainedOpConfig
        // 这里存在一个递归调用,将下游outEdge对应的StreamConfig作为参数,再次调用createOutputCollector
        // 最终的效果为上游operator的output指向下游operator,实现了chain,即链式调用
        // 最先返回的是最下游的output
        // operator的output按照从下游到上游的顺序,依次被包装为WatermarkGaugeExposingOutput
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
            createOutputCollector(
            containingTask,
            operatorConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperatorWrappers,
            mailboxExecutorFactory);
    
        // 创建链式operator
        // 参数中使用上一步生成的operator output
        OneInputStreamOperator<IN, OUT> chainedOperator =
            createOperator(
            containingTask,
            operatorConfig,
            userCodeClassloader,
            chainedOperatorOutput,
            allOperatorWrappers,
            false);
    
        // 将operator包装到output中并返回,后面分析
        return wrapOperatorIntoOutput(
            chainedOperator, containingTask, operatorConfig, userCodeClassloader, outputTag);
    }
    

    createOperator方法根据operatorConfig创建出StreamOperator,然后使用StreamOperatorWrapper包装:

    private <OUT, OP extends StreamOperator<OUT>> OP createOperator(
        StreamTask<OUT, ?> containingTask,
        StreamConfig operatorConfig,
        ClassLoader userCodeClassloader,
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> output,
        List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
        boolean isHead) {
    
        // now create the operator and give it the output collector to write its output to
        // 使用StreamOperatorFactory创建出一个StreamOperator,使用指定的output
        // 这个方法比较复杂,这里不再介绍
        Tuple2<OP, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =
            StreamOperatorFactoryUtil.createOperator(
            operatorConfig.getStreamOperatorFactory(userCodeClassloader),
            containingTask,
            operatorConfig,
            output,
            operatorEventDispatcher);
    
        // 获取创建的operator
        OP chainedOperator = chainedOperatorAndTimeService.f0;
        // 使用StreamOperatorWrapper包装此新创建的operator
        // StreamOperatorWrapper是operator在chaining时执行专用的封装类型,后面分析
        // 由于是递归调用,最先执行到这里的是最下游的算子
        // 因此allOperatorWrappers保存的顺序实际上是operator按照数据流向反向排列
        allOperatorWrappers.add(
            createOperatorWrapper(
                chainedOperator,
                containingTask,
                operatorConfig,
                chainedOperatorAndTimeService.f1,
                isHead));
    
        // 添加一个watermark监控用仪表盘
        chainedOperator
            .getMetricGroup()
            .gauge(
            MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
            output.getWatermarkGauge()::getValue);
        return chainedOperator;
    }
    

    这里我们重点说下createOperatorWrapper。该方法使用StreamOperatorWrapperStreamOperator包装起来。大家可能会问为什么这里需要使用StreamOperatorWrapper。我们看下StreamOperatorWrapper中的部分属性和方法。

    private StreamOperatorWrapper<?, ?> previous;
    
    private StreamOperatorWrapper<?, ?> next;
    
    // 中间省略...
    
    public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
        throws Exception {
        if (!isHead && !isStoppingBySyncSavepoint) {
            // NOTE: This only do for the case where the operator is one-input operator. At present,
            // any non-head operator on the operator chain is one-input operator.
            actionExecutor.runThrowing(() -> endOperatorInput(1));
        }
    
        quiesceTimeServiceAndCloseOperator(actionExecutor);
    
        // propagate the close operation to the next wrapper
        if (next != null) {
            next.close(actionExecutor, isStoppingBySyncSavepoint);
        }
    }
    

    我们不难发现StreamOperatorWrapper具有previousnext两个指针,形成了一个双向链表。OperatorChain中的所有的operator保存在这种双向链表结构中,从而实现了chain的语义,即OperatorChain中的operator按照顺序依次执行。

    createOperatorWrapper方法仅仅是使用StreamOperatorWrapper包装了StreamOperator,并没有生成双向队列。构建双向队列的方法为linkOperatorWrappers,后面我们分析。

    除此以外createOperatorWrapper还具有自己的关闭逻辑。如close方法所示,它除了关闭当前operator外,还会递归关闭队列后面所有的operator。

    最后我们分析。wrapOperatorIntoOutput方法。它将operator包装到Output中。output的类型为ChainingOutput

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput(
        OneInputStreamOperator<IN, OUT> operator,
        StreamTask<OUT, ?> containingTask,
        StreamConfig operatorConfig,
        ClassLoader userCodeClassloader,
        OutputTag<IN> outputTag) {
    
        WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
        // 如果开启了对象重用,创建ChainingOutput
        // 具体ChainingOutput相关内容在接下来章节分析
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            currentOperatorOutput = new ChainingOutput<>(operator, this, outputTag);
        } else {
            // 否则创建CopyingChainingOutput
            // 传递StreamRecord时会进行深拷贝
            TypeSerializer<IN> inSerializer =
                operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            currentOperatorOutput =
                new CopyingChainingOutput<>(operator, inSerializer, outputTag, this);
        }
    
        // wrap watermark gauges since registered metrics must be unique
        // 创建一个watermark监控仪表
        operator.getMetricGroup()
            .gauge(
            MetricNames.IO_CURRENT_INPUT_WATERMARK,
            currentOperatorOutput.getWatermarkGauge()::getValue);
    
        return currentOperatorOutput;
    }
    

    ChainingOutput

    ChainingOutput实现了把上游operator的输出作为下一个operator的输入。创建ChainingOutput时需要传入下游operator,保存到input属性中。它的构造方法如下所示:

    protected final Input<T> input;
    
    public ChainingOutput(
        OneInputStreamOperator<T, ?> operator,
        StreamStatusProvider streamStatusProvider,
        @Nullable OutputTag<T> outputTag) {
        this(
            operator,
            (OperatorMetricGroup) operator.getMetricGroup(),
            streamStatusProvider,
            outputTag,
            operator::close);
    }
    
    public ChainingOutput(
        Input<T> input,
        OperatorMetricGroup operatorMetricGroup,
        StreamStatusProvider streamStatusProvider,
        @Nullable OutputTag<T> outputTag,
        @Nullable AutoCloseable closeable) {
        this.input = input;
        this.closeable = closeable;
    
        {
            Counter tmpNumRecordsIn;
            try {
                OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
                tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                tmpNumRecordsIn = new SimpleCounter();
            }
            numRecordsIn = tmpNumRecordsIn;
        }
    
        this.streamStatusProvider = streamStatusProvider;
        this.outputTag = outputTag;
    }
    

    为了证实下ChainingOutput的确会把数据输出到下游operator,我们查看collect方法:

    @Override
    public void collect(StreamRecord<T> record) {
        if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }
    
        pushToOperator(record);
    }
    
    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            // we know that the given outputTag matches our OutputTag so the record
            // must be of the type that our operator expects.
            @SuppressWarnings("unchecked")
            StreamRecord<T> castRecord = (StreamRecord<T>) record;
    
            numRecordsIn.inc();
            input.setKeyContextElement(castRecord);
            input.processElement(castRecord);
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
    

    collect方法调用了pushToOperator方法。其中执行了input.processElement(castRecord),从而把数据传递给了下一个operator。

    ChainingOutput还有一个子类叫做CopyingChainingOutput。它重写了pushToOperator方法,在数据发送往下游operator之前会创建一个深拷贝。如果启用了Object重用(containingTask.getExecutionConfig().isObjectReuseEnabled()返回true),使用ChainingOutput,否则使用CopyingChainingOutput

    @Override
    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            // we know that the given outputTag matches our OutputTag so the record
            // must be of the type that our operator (and Serializer) expects.
            @SuppressWarnings("unchecked")
            StreamRecord<T> castRecord = (StreamRecord<T>) record;
    
            numRecordsIn.inc();
            // 这里创建出一个深拷贝,再发往下游
            StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
            input.setKeyContextElement(copy);
            input.processElement(copy);
        } catch (ClassCastException e) {
            // ...
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
    

    createChainedSources

    该方法用于创建chained数据源。

    @SuppressWarnings("rawtypes")
    private Map<SourceInputConfig, ChainedSource> createChainedSources(
        StreamTask<OUT, OP> containingTask,
        InputConfig[] configuredInputs,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
        // 如果所有的configuredInputs都不是SourceInputConfig类型,返回空map
        if (Arrays.stream(configuredInputs)
            .noneMatch(input -> input instanceof SourceInputConfig)) {
            return Collections.emptyMap();
        }
        // chained 数据源只适用于多个输入的StreamOperator
        checkState(
            mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator,
            "Creating chained input is only supported with MultipleInputStreamOperator and MultipleInputStreamTask");
        Map<SourceInputConfig, ChainedSource> chainedSourceInputs = new HashMap<>();
        MultipleInputStreamOperator<?> multipleInputOperator =
            (MultipleInputStreamOperator<?>) mainOperatorWrapper.getStreamOperator();
        // 获取它所有的Input
        List<Input> operatorInputs = multipleInputOperator.getInputs();
    
        // 计算InputGate的Index,为所有InputGate的index最大值加1
        int sourceInputGateIndex =
            Arrays.stream(containingTask.getEnvironment().getAllInputGates())
            .mapToInt(IndexedInputGate::getInputGateIndex)
            .max()
            .orElse(-1)
            + 1;
    
        // 遍历每个Input
        for (int inputId = 0; inputId < configuredInputs.length; inputId++) {
            // 排除掉所有不是SourceInputConfig类型的情况
            if (!(configuredInputs[inputId] instanceof SourceInputConfig)) {
                continue;
            }
            SourceInputConfig sourceInput = (SourceInputConfig) configuredInputs[inputId];
            int sourceEdgeId = sourceInput.getInputEdge().getSourceId();
            // 根据input edge获取sourceInputConfig
            StreamConfig sourceInputConfig = chainedConfigs.get(sourceEdgeId);
            OutputTag outputTag = sourceInput.getInputEdge().getOutputTag();
    
            // 创建链式的数据源output
            // 目前只支持Object Reuse开启
            // 实际返回的类型为ChainingOutput
            WatermarkGaugeExposingOutput chainedSourceOutput =
                createChainedSourceOutput(
                containingTask,
                operatorInputs.get(inputId),
                (OperatorMetricGroup) multipleInputOperator.getMetricGroup(),
                outputTag);
    
            // 创建数据源operator
            // createOperator前面分析过,不再赘述
            SourceOperator<?, ?> sourceOperator =
                (SourceOperator<?, ?>)
                createOperator(
                containingTask,
                sourceInputConfig,
                userCodeClassloader,
                (WatermarkGaugeExposingOutput<StreamRecord<OUT>>)
                chainedSourceOutput,
                allOpWrappers,
                true);
            // 放入chainedSourceInputs中
            chainedSourceInputs.put(
                sourceInput,
                new ChainedSource(
                    chainedSourceOutput,
                    new StreamTaskSourceInput<>(
                        sourceOperator, sourceInputGateIndex++, inputId)));
        }
        return chainedSourceInputs;
    }
    

    linkOperatorWrappers

    linkOperatorWrappers方法将chain中的operator按照逐个连接起来。注意,由于上一步createOutputCollector方法构造的allOperatorWrappers存放的各个operator顺序为从下游到上游,因此linkOperatorWrappers方法需要将这个连接顺序颠倒过来。

    private StreamOperatorWrapper<?, ?> linkOperatorWrappers(
        List<StreamOperatorWrapper<?, ?>> allOperatorWrappers) {
        // 暂存前一个处理的operator
        StreamOperatorWrapper<?, ?> previous = null;
        // 遍历所有的operator
        for (StreamOperatorWrapper<?, ?> current : allOperatorWrappers) {
            if (previous != null) {
                // 设置previous的前一个operator为当前operator
                previous.setPrevious(current);
            }
            // 设置当前operator的下一个operator为previous
            current.setNext(previous);
            // 设置当前operator为previous
            previous = current;
        }
        return previous;
    }
    

    initializeStateAndOpenOperators

    StreamTask开始接收数据之前,需要初始化各个operator的状态(state)和开启operator(调用各个operator的open方法)。initializeStateAndOpenOperators正是用来完成这个工作的。

    protected void initializeStateAndOpenOperators(
        StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
            StreamOperator<?> operator = operatorWrapper.getStreamOperator();
            operator.initializeState(streamTaskStateInitializer);
            operator.open();
        }
    }
    

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之OperatorChain

        本文链接:https://www.haomeiwen.com/subject/ijsyjltx.html