美文网首页flinkflink
Flink源码分析之StreamGraph

Flink源码分析之StreamGraph

作者: 小C菜鸟 | 来源:发表于2018-06-18 17:00 被阅读9次

    StreamGraph是什么?

    StreamGraph顾名思义是流图,它描述的是一个数据流的拓补结构,包含了创建JobGraph的所有必要信息。StreamGraph由StreamNode和StreamEdge组成,StreamNode描述了流程序中的一个操作符和相关的属性,StreamEdge是连接两个StreamNode的边,代表的是一个数据流。


    为什么需要StreamGraph?

    Flink是一个流式计算引擎,它支持两种类型的执行模式:流式计算和批处理。但是在运行时,这两种模式是被统一,并没有区别对待,都是使用JobGraph进行描述。为了描述这两种执行模式,分别定义了不同的数据结构,那就是StreamGraph和Plan,基于这两种数据结构生成相应的JobGraph。

    如何创建StreamGraph?

    在介绍如何创建StreamGraph之前,我们先介绍一些跟StreamGraph相关的和新概念和数据结构。

    StreamExecutionEnvironment

    流的执行环境,包含流的ExecutionConfig,CheckpointConfig,StreamTransformation列表和StateBackend等。它是生成StreamGraph的入口:

        public StreamGraph getStreamGraph() {
            if (transformations.size() <= 0) {
                throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
            }
            return StreamGraphGenerator.generate(this, transformations);
        }
    

    StreamGraphGenerator

    顾名思义,StreamGraph生成器,真正执行和生成StreamGraph的类。它根据创建流拓补结构过程中生成的StreamTransformation,解析生成相应StreamNode和StreamEdge,组合生成StreamGraph。

        private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
            for (StreamTransformation<?> transformation: transformations) {
                transform(transformation);
            }
            return streamGraph;
        }
    

    StreamTransformation

    StreamTransformation代表了一个创建DataStream的操作,每个DataStream都有一个底层的StreamTransformation。但是并不是每一个StreamTransformation都对应一个运行时的物理操作。例如:



    上图左边对应的是创建流拓补结构是对应的流图,而右边则是运行时的操作图。
    StreamTransformation是一个抽象类型,具体对应的实现由很多种:

    • SourceTransformation
    • OneInputTransformation
    • SideOutputTransformation
    • SelectTransformation
    • PartitionTransformation
    • CoFeedbackTransformation
    • FeedbackTransformation
    • UnionTransformation
    • SplitTransformation
    • TwoInputTransformation
    • SinkTransformation

    每种类型的StreamTransformation都对应相应的operator,通过解析具体的StreamTransformation,创建StreamNode和StreamEdge。当然前面已经说过,并不是每种类型的StreamTransformation都会创建StreamNode和StreamEdge,下面看下具体的生成过程:

        private Collection<Integer> transform(StreamTransformation<?> transform) {
    
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            LOG.debug("Transforming " + transform);
    
            if (transform.getMaxParallelism() <= 0) {
    
                // if the max parallelism hasn't been set, then first use the job wide max parallelism
                // from theExecutionConfig.
                int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
                if (globalMaxParallelismFromConfig > 0) {
                    transform.setMaxParallelism(globalMaxParallelismFromConfig);
                }
            }
    
            // call at least once to trigger exceptions about MissingTypeInfo
            transform.getOutputType();
    
            Collection<Integer> transformedIds;
            if (transform instanceof OneInputTransformation<?, ?>) {
                transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
            } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
            } else if (transform instanceof SourceTransformation<?>) {
                transformedIds = transformSource((SourceTransformation<?>) transform);
            } else if (transform instanceof SinkTransformation<?>) {
                transformedIds = transformSink((SinkTransformation<?>) transform);
            } else if (transform instanceof UnionTransformation<?>) {
                transformedIds = transformUnion((UnionTransformation<?>) transform);
            } else if (transform instanceof SplitTransformation<?>) {
                transformedIds = transformSplit((SplitTransformation<?>) transform);
            } else if (transform instanceof SelectTransformation<?>) {
                transformedIds = transformSelect((SelectTransformation<?>) transform);
            } else if (transform instanceof FeedbackTransformation<?>) {
                transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
            } else if (transform instanceof CoFeedbackTransformation<?>) {
                transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
            } else if (transform instanceof PartitionTransformation<?>) {
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
            } else if (transform instanceof SideOutputTransformation<?>) {
                transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
            } else {
                throw new IllegalStateException("Unknown transformation: " + transform);
            }
    
            // need this check because the iterate transformation adds itself before
            // transforming the feedback edges
            if (!alreadyTransformed.containsKey(transform)) {
                alreadyTransformed.put(transform, transformedIds);
            }
    
            if (transform.getBufferTimeout() > 0) {
                streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
            }
            if (transform.getUid() != null) {
                streamGraph.setTransformationUID(transform.getId(), transform.getUid());
            }
            if (transform.getUserProvidedNodeHash() != null) {
                streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
            }
    
            if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
                streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
            }
    
            return transformedIds;
        }
    

    以OneInputTransformation为例,看下是如何创建StreamNode和StreamEdge。

        private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
    
            Collection<Integer> inputIds = transform(transform.getInput());
    
            // the recursive call might have already transformed this
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    
            streamGraph.addOperator(transform.getId(),
                    slotSharingGroup,
                    transform.getOperator(),
                    transform.getInputType(),
                    transform.getOutputType(),
                    transform.getName());
    
            if (transform.getStateKeySelector() != null) {
                TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
                streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
            }
    
            streamGraph.setParallelism(transform.getId(), transform.getParallelism());
            streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
    
            for (Integer inputId: inputIds) {
                streamGraph.addEdge(inputId, transform.getId(), 0);
            }
    
            return Collections.singleton(transform.getId());
        }
    

    第一步:解析OneInputTransformation的输入源(也是一个StreamTransformation,其实这是一个递归的过程,从最后一个StreamTransformation一直解析到第一个StreamTransformation),得到输入源的节点id。

            Collection<Integer> inputIds = transform(transform.getInput());
    

    第二步:创建StreamNode,并添加到StreamGraph中。

       streamGraph.addOperator(transform.getId(),
                   slotSharingGroup,
                   transform.getOperator(),
                   transform.getInputType(),
                   transform.getOutputType(),
                   transform.getName());
    

    第三步:往StreamGraph中添加关联输入源和节点的边。

        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }
    

    当遍历完所有的StreamTransformation,则相应的StreamGraph也已经生成。

    相关文章

      网友评论

        本文标题:Flink源码分析之StreamGraph

        本文链接:https://www.haomeiwen.com/subject/uudneftx.html