美文网首页
StreamGraph生成

StreamGraph生成

作者: 专职掏大粪 | 来源:发表于2020-05-26 10:41 被阅读0次
    image.png

    StreamExecutionEnvironment

    StreamExecutionEnvironment 是 Flink 在流模式下任务执行的上下文,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment 有不同的具体实现类,如 LocalStreamEnvironment, RemoteStreamEnvironment 等。StreamExecutionEnvironment 也提供了用来配置默认并行度Checkpointing 等机制的方法,这些配置主要都保存在 ExecutionConfigCheckpointConfig
    StreamExecutionEnvironment 内部使用一个 List<StreamTransformation<?>> transformations 来保留生成 DataStream 的所有转换
    StreamTransformation 代表了生成 DataStream操作,在 DataStream 上面通过算子不断进行转换,就得到了由 StreamTransformation 构成的图。当需要执行的时候,底层的这个图就会被转换成 StreamGraph

    StreamTransformation 在运行时并不一定对应着一个物理转换操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning 等。

    每一个 StreamTransformation 都有一个关联的 Id,这个 Id 是全局递增的。除此以外,还有 uid, slotSharingGroup, parallelism 等信息。

    StreamTransformation 有很多具体的子类,如SourceTransformationOneInputStreamTransformationTwoInputTransformationSideOutputTransformationSinkTransformation 等等,这些分别对应了DataStream 上的不同转换操作。

        public OneInputTransformation(
                Transformation<IN> input,
                String name,
                StreamOperatorFactory<OUT> operatorFactory,
                TypeInformation<OUT> outputType,
                int parallelism) {
            super(name, outputType, parallelism);
            this.input = input;
            this.operatorFactory = operatorFactory;
        }
    
        public TwoInputTransformation(
                Transformation<IN1> input1,
                Transformation<IN2> input2,
                String name,
                StreamOperatorFactory<OUT> operatorFactory,
                TypeInformation<OUT> outputType,
                int parallelism) {
            super(name, outputType, parallelism);
            this.input1 = input1;
            this.input2 = input2;
            this.operatorFactory = operatorFactory;
        }
    

    DataStream

    一个 DataStream 就表征了由同一种类型元素构成的数据流。通过对 DataStream 应用 map/filter 等操作,可以将一个 DataStream 转换为另一个 DataStream,这个转换的过程就是根据不同的操作生成不同的 StreamTransformation,并将其加入 StreamExecutionEnvironmenttransformations 列表中。

        @PublicEvolving
        public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
            transformation.getOutputType();
                    //构造 StreamTransformation
            OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    operator,
                    outTypeInfo,
                    environment.getParallelism());
    
            @SuppressWarnings({ "unchecked", "rawtypes" })
            SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
              //加入到 StreamExecutionEnvironment 的列表中
            getExecutionEnvironment().addOperator(resultTransform);
            return returnStream;
        }
    

    DataStream 的子类包括 SingleOutputStreamOperatorDataStreamSource KeyedStreamIterativeStream, SplitStream(已弃用)。这里要吐槽一下 SingleOutputStreamOperator 的这个类的命名,太容易和 StreamOperator 混淆了。StreamOperator 的介绍见下一小节。

    除了 DataStream 及其子类以外,其它的表征数据流的类还有 ConnectedStreams (两个流连接在一起)、 WindowedStreamAllWindowedStream 。这些数据流之间的转换可以参考 Flink 的官方文档

    StreamOperator

    在操作 DataStream 的时候,比如 DataStream#map 等,会要求我们实现一个自定义的处理函数。那么这些信息时如何保存在 StreamTransformation 中的呢?这里就要引入一个新的接口 StreamOperator。

    StreamOperator 定义了对一个具体的算子的生命周期的管理,包括:

            //生命周期
        void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
        void open() throws Exception;
        void close() throws Exception;
        @Override
        void dispose() throws Exception;
        //状态管理
        OperatorSnapshotFutures snapshotState(
            long checkpointId,
            long timestamp,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory storageLocation) throws Exception;
        void initializeState() throws Exception;
    
      //其它方法暂时省略
    

    StreamOperator 的两个子接口 OneInputStreamOperatorTwoInputStreamOperator 则提供了操作数据流中具体元素的方法,而 AbstractUdfStreamOperator 这个抽象子类则提供了自定义处理函数对应的算子的基本实现:

       #OneInputStreamOperator
        void processElement(StreamRecord<IN> element) throws Exception;
        void processWatermark(Watermark mark) throws Exception;
        void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
       #TwoInputStreamOperator
    void processElement1(StreamRecord<IN1> element) throws Exception;
    void processElement2(StreamRecord<IN2> element) throws Exception;
    void processWatermark1(Watermark mark) throws Exception;
    void processWatermark2(Watermark mark) throws Exception;
    #AbstractStreamOperator
    提供了initializeState、snapshotState等基本方法的实现
    

    至于具体到诸如 map/fliter 等操作对应的 StreamOperator,基本都是在 AbstractUdfStreamOperator 的基础上实现的。以 StreamMap 为例

    @Internal
    public class StreamMap<IN, OUT>
            extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
            implements OneInputStreamOperator<IN, OUT> {
        private static final long serialVersionUID = 1L;
        public StreamMap(MapFunction<IN, OUT> mapper) {
            super(mapper);
            chainingStrategy = ChainingStrategy.ALWAYS;
        }
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            output.collect(element.replace(userFunction.map(element.getValue())));
        }
    }
    

    通过 DataStream –> StreamTransformation –> StreamOperator 这样的依赖关系,就可以完成 DataStream 的转换,并且保留数据流和应用在流上的算子之间的关系

    StreamGraph

    StreamGraphGenerator 会基于 StreamExecutionEnvironmenttransformations 列表来生成 StreamGraph

    在遍历 List<StreamTransformation> 生成 StreamGraph 的时候,会递归调用StreamGraphGenerator#transform方法。对于每一个 StreamTransformation, 确保当前其上游已经完成转换。StreamTransformations 被转换为 StreamGraph 中的节点 StreamNode,并为上下游节点添加边 StreamEdge

    Collection<Integer> transformedIds;
            if (transform instanceof OneInputTransformation<?, ?>) {
                transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
            } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
            } else if (transform instanceof SourceTransformation<?>) {
                transformedIds = transformSource((SourceTransformation<?>) transform);
            } else if (transform instanceof SinkTransformation<?>) {
                transformedIds = transformSink((SinkTransformation<?>) transform);
            } else if (transform instanceof UnionTransformation<?>) {
                transformedIds = transformUnion((UnionTransformation<?>) transform);
            } else if (transform instanceof SplitTransformation<?>) {
                transformedIds = transformSplit((SplitTransformation<?>) transform);
            } else if (transform instanceof SelectTransformation<?>) {
                transformedIds = transformSelect((SelectTransformation<?>) transform);
            } else if (transform instanceof FeedbackTransformation<?>) {
                transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
            } else if (transform instanceof CoFeedbackTransformation<?>) {
                transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
            } else if (transform instanceof PartitionTransformation<?>) {
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
            } else if (transform instanceof SideOutputTransformation<?>) {
                transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
            } else {
                throw new IllegalStateException("Unknown transformation: " + transform);
            }
    

    对于不同类型的 StreamTransformation,分别调用对应的转换方法,以 最典型的 transformOneInputTransform 为例:

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
            //首先确保上游节点完成转换
            Collection<Integer> inputIds = transform(transform.getInput());
    
            // the recursive call might have already transformed this
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    
            streamGraph.addOperator(transform.getId(),
                    slotSharingGroup,
                    transform.getCoLocationGroupKey(),
                    transform.getOperatorFactory(),
                    transform.getInputType(),
                    transform.getOutputType(),
                    transform.getName());
    
            if (transform.getStateKeySelector() != null) {
                TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
                streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
            }
    
            int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
                transform.getParallelism() : executionConfig.getParallelism();
            streamGraph.setParallelism(transform.getId(), parallelism);
            streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
    
            for (Integer inputId: inputIds) {
                streamGraph.addEdge(inputId, transform.getId(), 0);
            }
    
            return Collections.singleton(transform.getId());
        }
    

    对于不同类型的 StreamTransformation,分别调用对应的转换方法,以 最典型的 transformOneInputTransform 为例:

        private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
            //首先确保上游节点完成转换
            Collection<Integer> inputIds = transform(transform.getInput());
            // 由于是递归调用的,可能已经完成了转换
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    
            streamGraph.addOperator(transform.getId(),
                    slotSharingGroup,
                    transform.getCoLocationGroupKey(),
                    transform.getOperatorFactory(),
                    transform.getInputType(),
                    transform.getOutputType(),
                    transform.getName());
    
            if (transform.getStateKeySelector() != null) {
                TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
                streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
            }
    
            int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
                transform.getParallelism() : executionConfig.getParallelism();
            streamGraph.setParallelism(transform.getId(), parallelism);
            streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
    
            for (Integer inputId: inputIds) {
                streamGraph.addEdge(inputId, transform.getId(), 0);
            }
    
            return Collections.singleton(transform.getId());
        }
    

    StreamGraph 中对应的添加节点的方法:

        protected StreamNode addNode(Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            Class<? extends AbstractInvokable> vertexClass,
            StreamOperatorFactory<?> operatorFactory,
            String operatorName) {
    
            if (streamNodes.containsKey(vertexID)) {
                throw new RuntimeException("Duplicate vertexID " + vertexID);
            }
    
            StreamNode vertex = new StreamNode(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorFactory,
                operatorName,
                new ArrayList<OutputSelector<?>>(),
                vertexClass);
            //创建 StreamNode,这里保存了 StreamOperator 和 vertexClass 信息
            streamNodes.put(vertexID, vertex);
    
            return vertex;
        }
    

    StreamNode 中,保存了对应的 StreamOperator (从 StreamTransformation 得到),并且还引入了变量 jobVertexClass 来表示该节点在 TaskManager 中运行时的实际任务类型

    Class<? extends AbstractInvokable> jobVertexClass
    

    AbstractInvokable 是所有可以在 TaskManager 中运行的任务的抽象基础类,包括流式任务和批任务。StreamTask 是所有流式任务的基础类,其具体的子类包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask 等。

    对于一些不包含物理转换操作的 StreamTransformation,如 Partitioning, split/select, union,并不会生成 StreamNode,而是生成一个带有特定属性的虚拟节点。当添加一条有虚拟节点指向下游节点时,会找到虚拟节点上游的物理节点,在两个物理节点之间添加,并把虚拟转换操作的属性附着上去。

    以 PartitionTansformation 为例, PartitionTansformation 是 KeyedStream 对应的转换:

    //StreamGraphGenerator#transformPartition
        private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
            Transformation<T> input = partition.getInput();
            //递归地转换上游节点
            List<Integer> resultIds = new ArrayList<>();
                    //上游输入节点
            Collection<Integer> transformedIds = transform(input);
            for (Integer transformedId: transformedIds) {
                int virtualId = Transformation.getNewNodeId();
                //添加虚拟的 Partition 节点
                streamGraph.addVirtualPartitionNode(
                        transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
                resultIds.add(virtualId);
            }
    
            return resultIds;
        }
    // StreamGraph#addVirtualPartitionNode
        public void addVirtualPartitionNode(
                Integer originalId,
                Integer virtualId,
                StreamPartitioner<?> partitioner,
                ShuffleMode shuffleMode) {
    
            if (virtualPartitionNodes.containsKey(virtualId)) {
                throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
            }
            //添加一个虚拟节点,后续添加边的时候会连接到实际的物理节点
            virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, shuffleMode));
        }
    

    前面提到,在每一个物理节点转换上,会调用 StreamGraph#addEdge输入节点当前节点之间建立边的连接

    private void addEdgeInternal(Integer upStreamVertexID,
                Integer downStreamVertexID,
                int typeNumber,
                StreamPartitioner<?> partitioner,
                List<String> outputNames,
                OutputTag outputTag,
                ShuffleMode shuffleMode) {
    //先判断是不是虚拟节点上的边,如果是,则找到虚拟节点上游对应的物理节点
            //在两个物理节点之间添加边,并把对应的 StreamPartitioner,或者 OutputTag 等补充信息添加到StreamEdge中
            if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
                if (outputTag == null) {
                    outputTag = virtualSideOutputNodes.get(virtualId).f1;
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
            } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
                if (outputNames.isEmpty()) {
                    // selections that happen downstream override earlier selections
                    outputNames = virtualSelectNodes.get(virtualId).f1;
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
            } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
                if (partitioner == null) {
                    partitioner = virtualPartitionNodes.get(virtualId).f1;
                }
                shuffleMode = virtualPartitionNodes.get(virtualId).f2;
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
            } else {
                //两个物理节点
                StreamNode upstreamNode = getStreamNode(upStreamVertexID);
                StreamNode downstreamNode = getStreamNode(downStreamVertexID);
    
                // If no partitioner was specified and the parallelism of upstream and downstream
                // operator matches use forward partitioning, use rebalance otherwise.
                if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                    partitioner = new ForwardPartitioner<Object>();
                } else if (partitioner == null) {
                    partitioner = new RebalancePartitioner<Object>();
                }
    
                if (partitioner instanceof ForwardPartitioner) {
                    if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                        throw new UnsupportedOperationException("Forward partitioning does not allow " +
                                "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                                ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                                " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                    }
                }
    
                if (shuffleMode == null) {
                    shuffleMode = ShuffleMode.UNDEFINED;
                }
                //创建 StreamEdge,保留了 StreamPartitioner 等属性
                StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
    
        //分别将StreamEdge添加到上游节点和下游节点
        getStreamNode(edge.getSourceId()).addOutEdge(edge);
                getStreamNode(edge.getTargetId()).addInEdge(edge);
            }
        }
    

    这样通过 StreamNode 和 SteamEdge,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了

    参考自
    https://ververica.cn/developers/advanced-tutorial-2-flink-job-execution-depth-analysis/
    https://blog.jrwang.me/tags/flink/
    https://ververica.cn/developers/advanced-tutorial-1-analysis-of-the-core-mechanism-of-runtime/

    相关文章

      网友评论

          本文标题:StreamGraph生成

          本文链接:https://www.haomeiwen.com/subject/hfjmahtx.html