美文网首页
一文搞懂 Flink OperatorChain 对象重用

一文搞懂 Flink OperatorChain 对象重用

作者: shengjk1 | 来源:发表于2022-06-21 19:18 被阅读0次

    OperatorChain 的对象重用,可以提高效率,但什么情况下可以重用,什么情况下不可以重用,我们一起来看你一下代码:

    首先,在OperatorChain 类的 createChainedOperator 方法

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
                StreamTask<?, ?> containingTask,
                StreamConfig operatorConfig,
                Map<Integer, StreamConfig> chainedConfigs,
                ClassLoader userCodeClassloader,
                Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
                List<StreamOperator<?>> allOperators,
                OutputTag<IN> outputTag) {
            ...
            //chainingoutput
            if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
            }
            else {
                // deep copy
                TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
                currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
            }
    ...
        }
    

    如果启用了对象重用,即 isObjectReuseEnabled==true,创建的 outPut 为 ChainingOutput,如果没有启用对象重用,则 outPut 为 CopyingChainingOutput。

    需要明确的是 currentOperatorOutput 是为给下游算子输入数据的。而 ChainingOutput 和 CopyingChainingOutput 的区别是 ChainingOutput 是值传递,而 CopyingChainingOutput 是深拷贝。看到这里我们应该就已经明确了什么情况下可以启动对象重用什么情况下不可以启用对象重用。

    我们需要明确的一个点对应 java bean 来说,在启动对象重用情况下,如果下游的算子更改了某个属性值,会直接影响上游,以及其下游,这点还是要特别注意的

    相关文章

      网友评论

          本文标题:一文搞懂 Flink OperatorChain 对象重用

          本文链接:https://www.haomeiwen.com/subject/gomevrtx.html