美文网首页flinkflink
Flink源码分析之StreamGraph

Flink源码分析之StreamGraph

作者: 小C菜鸟 | 来源:发表于2018-06-18 17:00 被阅读9次

StreamGraph是什么?

StreamGraph顾名思义是流图,它描述的是一个数据流的拓补结构,包含了创建JobGraph的所有必要信息。StreamGraph由StreamNode和StreamEdge组成,StreamNode描述了流程序中的一个操作符和相关的属性,StreamEdge是连接两个StreamNode的边,代表的是一个数据流。


为什么需要StreamGraph?

Flink是一个流式计算引擎,它支持两种类型的执行模式:流式计算和批处理。但是在运行时,这两种模式是被统一,并没有区别对待,都是使用JobGraph进行描述。为了描述这两种执行模式,分别定义了不同的数据结构,那就是StreamGraph和Plan,基于这两种数据结构生成相应的JobGraph。

如何创建StreamGraph?

在介绍如何创建StreamGraph之前,我们先介绍一些跟StreamGraph相关的和新概念和数据结构。

StreamExecutionEnvironment

流的执行环境,包含流的ExecutionConfig,CheckpointConfig,StreamTransformation列表和StateBackend等。它是生成StreamGraph的入口:

    public StreamGraph getStreamGraph() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return StreamGraphGenerator.generate(this, transformations);
    }

StreamGraphGenerator

顾名思义,StreamGraph生成器,真正执行和生成StreamGraph的类。它根据创建流拓补结构过程中生成的StreamTransformation,解析生成相应StreamNode和StreamEdge,组合生成StreamGraph。

    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            transform(transformation);
        }
        return streamGraph;
    }

StreamTransformation

StreamTransformation代表了一个创建DataStream的操作,每个DataStream都有一个底层的StreamTransformation。但是并不是每一个StreamTransformation都对应一个运行时的物理操作。例如:



上图左边对应的是创建流拓补结构是对应的流图,而右边则是运行时的操作图。
StreamTransformation是一个抽象类型,具体对应的实现由很多种:

  • SourceTransformation
  • OneInputTransformation
  • SideOutputTransformation
  • SelectTransformation
  • PartitionTransformation
  • CoFeedbackTransformation
  • FeedbackTransformation
  • UnionTransformation
  • SplitTransformation
  • TwoInputTransformation
  • SinkTransformation

每种类型的StreamTransformation都对应相应的operator,通过解析具体的StreamTransformation,创建StreamNode和StreamEdge。当然前面已经说过,并不是每种类型的StreamTransformation都会创建StreamNode和StreamEdge,下面看下具体的生成过程:

    private Collection<Integer> transform(StreamTransformation<?> transform) {

        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() > 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

以OneInputTransformation为例,看下是如何创建StreamNode和StreamEdge。

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getOperator(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

第一步:解析OneInputTransformation的输入源(也是一个StreamTransformation,其实这是一个递归的过程,从最后一个StreamTransformation一直解析到第一个StreamTransformation),得到输入源的节点id。

        Collection<Integer> inputIds = transform(transform.getInput());

第二步:创建StreamNode,并添加到StreamGraph中。

   streamGraph.addOperator(transform.getId(),
               slotSharingGroup,
               transform.getOperator(),
               transform.getInputType(),
               transform.getOutputType(),
               transform.getName());

第三步:往StreamGraph中添加关联输入源和节点的边。

    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }

当遍历完所有的StreamTransformation,则相应的StreamGraph也已经生成。

相关文章

网友评论

    本文标题:Flink源码分析之StreamGraph

    本文链接:https://www.haomeiwen.com/subject/uudneftx.html