美文网首页玩转大数据
Flink 源码之StreamGraph生成

Flink 源码之StreamGraph生成

作者: AlienPaul | 来源:发表于2019-12-05 16:14 被阅读0次

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

什么是StreamGraph

StreamGraph是Flink任务执行流程拓扑图的封装。在Flink的client端,Environment执行execute()方法的时候,用户编写的数据处理流程会转变为StreamGraph。

各个算子最终会变成什么

Flink流处理的各个算子会被当做一系列transformation储存起来。具体请参见Flink 源码之基本算子
下面以DataStream的Map方法为例说明。
DataStream的map方法:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    // 获取mapper函数的返回类型
    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
            Utils.getCallLocationName(), true);
    // 对map操作封装为transformation,并返回SingleOutputStreamOperator
    return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

transform方法的代码:

public <R> SingleOutputStreamOperator<R> transform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        OneInputStreamOperator<T, R> operator) {
    // 这里将operator封装入operator factory
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

doTransform方法:

private <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    // 如果transformation的输出类型为MissingTypeInfo的话,程序会抛异常
    transformation.getOutputType();

    // 构造新的transformation
    // map类型的transformation只有一个输入,因此它输入OneInputTransformation
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            this.transformation,
            operatorName,
            operatorFactory,
            outTypeInfo,
            environment.getParallelism());

    // 构造返回的stream,供后续的算子链式调用
    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    // 将transformation写入ExecutionEnvironment中
    // ExecutionEnvironment维护了一个叫做transformations的ArrayList对象,用于储存所有的transformation
    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

执行到此,map算子已被封装为transformation,存储到了ExecutionEnvironment中。

几种transformation的类型

OneInputTransformation

顾名思义OneInputTransformation只有一个输入。代表的算子(单数据流)为:map,flatMap,fliter,process,assignTimestamps等。

TwoInputTransformation

TwoInputTransformation具有两个输入。ConnectedStream的算子为双流运算,它的算子会被转换为TwoInputTransformation。

SourceTransformation

在env中配置数据源的时候会创建出一个DataStreamSource。该对象为dataStream的源头。DataStreamSource的构造函数中会创建一个SourceTransformation。

SinkTransformation

和SourceTransformation类似,在dataStream调用addSink方法的时候会生成一个DataStreamSink对象。该对象在创建的时候会同时构造一个SinkTransformation。

UnionTransformation

该transformation为合并多个input到一个流中。代表算子为union。

SplitTransformation

DataStream调用split的时候会创建SplitStream。SplitStream初始化时会构建一个SplitTransformation。

SelectTransformation

SplitStream在调用select算子的时候会创建SelectTransformation。

FeedbackTransformation

创建IterativeStream的时候会使用到该transformation。

CoFeedbackTransformation

和FeedbackTransformation类似,创建ConnectedIterativeStream的时候会使用到。

PartitionTransformation

涉及到控制数据流向的算子都属于PartitionTransformation,例如shuffle,forward,rebalance,broadcast,rescale,global,partitionCustom和keyBy等。

SideOutputTransformation

调用getSideOutput(获取旁路输出)的时候,SideOutputTransformation会发生作用。

根据Transformation构建Stream Graph

我们从StreamExecutionEnvironment的execute方法开始分析stream graph的生成过程。

StreamExecutionEnvironment的execute方法源码:

public JobExecutionResult execute(String jobName) throws Exception {
    Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

    return execute(getStreamGraph(jobName));
}

该方法中调用了getStreamGraph方法。找到这个方法,如下所示:

public StreamGraph getStreamGraph(String jobName) {
    // 创建一个StreamGraphGenerator对象,设置参数,并调用generate方法生成stream graph
    return getStreamGraphGenerator().setJobName(jobName).generate();
}

private StreamGraphGenerator getStreamGraphGenerator() {
    if (transformations.size() <= 0) {
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
    }
    // 此处向StreamGraphGenerator传入transformations,以及其他的配置
    return new StreamGraphGenerator(transformations, config, checkpointCfg)
        .setStateBackend(defaultStateBackend)
        .setChaining(isChainingEnabled)
        .setUserArtifacts(cacheFile)
        .setTimeCharacteristic(timeCharacteristic)
        .setDefaultBufferTimeout(bufferTimeout);
}

接着跟踪到StreamGraphGeneratorgenerate方法,如下所示:

public StreamGraph generate() {
    // 生成StreamGraph对象,传入执行配置和检查点配置
    streamGraph = new StreamGraph(executionConfig, checkpointConfig);
    // 设置状态后端
    streamGraph.setStateBackend(stateBackend);
    // 设置级联配置,为一项优化配置
    streamGraph.setChaining(chaining);
    // 设置调度方式,决定task延迟调度还是立刻调度
    streamGraph.setScheduleMode(scheduleMode);
    // StreamExecutionEnvironment的cacheFile会传入该变量
    // cacheFile为需要分发到各个task manager的用户文件
    streamGraph.setUserArtifacts(userArtifacts);
    // 设置时间特征,是event time,processing time还是ingestion time
    streamGraph.setTimeCharacteristic(timeCharacteristic);
    // 设置作业名称
    streamGraph.setJobName(jobName);
    // 设置各个级联之间是否采用blocking 连接
    streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);

    // 储存已经被处理的transformation
    alreadyTransformed = new HashMap<>();

    // 逐个处理transformation
    for (Transformation<?> transformation: transformations) {
        transform(transformation);
    }

    // 获取已生成的streamGraph
    final StreamGraph builtStreamGraph = streamGraph;

    // 清空中间变量
    alreadyTransformed.clear();
    alreadyTransformed = null;
    streamGraph = null;

    return builtStreamGraph;
}

这样看来,重点就在transform(transformation);这一行代码了。
transform方法:

    private Collection<Integer> transform(Transformation<?> transform) {

                // 检查该transformation是否已被处理,如果已处理直接返回
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

                // 如果transformation的最大并行度没有设置,全局的最大并行度已设置,将全局最大并行度设置给transformation
        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
                // 检查transformation的输出类型,如果是MissingTypeInfo则程序抛出异常
        transform.getOutputType();

        Collection<Integer> transformedIds;
                // 依照transformation的具体类型,提供不同的处理方法
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        // 如果该transformation没有被处理,则加入已处理列表
        // 处理每个transformation的时候会先处理它的input(可能没有input,也可能有一个或多个),transform方法会递归调用。
        // 在transform方法执行前后双重检查transformation是否已被处理可以确保在递归调用的情况下不会被重复处理
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        // 设置network的buffer超时时间
        if (transform.getBufferTimeout() >= 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
            streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
        }

         // 设置uid
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        // 设置UserProvidedNodeHash
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        // 如果自动设置uid功能被关闭,同时又没有指定UserProvidedNodeHash和uid,程序抛出异常
        if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
            if (transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
                throw new IllegalStateException("Auto generated UIDs have been disabled " +
                    "but no UID or hash has been assigned to operator " + transform.getName());
            }
        }

        // 设置transformation的最小和最佳资源要求
        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

接下来从transformSource方法开始解析下StreamGraph的生成。

private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
    // 返回slotSharingGroup。此处返回“default”
    String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());

    // StreamGraph增加数据源
    streamGraph.addSource(source.getId(),
            slotSharingGroup,
            source.getCoLocationGroupKey(),
            source.getOperatorFactory(),
            null,
            source.getOutputType(),
            "Source: " + source.getName());
    // 设置输入数据类型
    if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) {
        streamGraph.setInputFormat(source.getId(),
                ((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat());
    }
    // 设置并行度
    int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
        source.getParallelism() : executionConfig.getParallelism();
    streamGraph.setParallelism(source.getId(), parallelism);
    streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
    return Collections.singleton(source.getId());
}

StreamGraph的addSource方法

public <IN, OUT> void addSource(Integer vertexID,
    @Nullable String slotSharingGroup,
    @Nullable String coLocationGroup,
    StreamOperatorFactory<OUT> operatorFactory,
    TypeInformation<IN> inTypeInfo,
    TypeInformation<OUT> outTypeInfo,
    String operatorName) {
    addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
    sources.add(vertexID);
}

这个方法逻辑不多,先增加operator,再把SourceTransformation设置为StreamGraph的source。

addOperator方法:

public <IN, OUT> void addOperator(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory<OUT> operatorFactory,
        TypeInformation<IN> inTypeInfo,
        TypeInformation<OUT> outTypeInfo,
        String operatorName) {

    // 判断是否为StreamSource,如果是数据源的sourceFunction,它会被封装入StreamSource对象,此处返回true
    if (operatorFactory.isStreamSource()) {
        addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
    } else {
        addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
    }

    // 构建输入输出类型的序列化器
    TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;

    TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;

    // 设置序列化器
    setSerializers(vertexID, inSerializer, null, outSerializer);

    // 设置operatorFactory的输入和输出类型
    if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
        // sets the output type which must be know at StreamGraph creation time
        operatorFactory.setOutputType(outTypeInfo, executionConfig);
    }

    if (operatorFactory.isInputTypeConfigurable()) {
        operatorFactory.setInputType(inTypeInfo, executionConfig);
    }

    if (LOG.isDebugEnabled()) {
        LOG.debug("Vertex: {}", vertexID);
    }
}

addNode方法:

protected StreamNode addNode(Integer vertexID,
    @Nullable String slotSharingGroup,
    @Nullable String coLocationGroup,
    Class<? extends AbstractInvokable> vertexClass,
    StreamOperatorFactory<?> operatorFactory,
    String operatorName) {

    if (streamNodes.containsKey(vertexID)) {
        throw new RuntimeException("Duplicate vertexID " + vertexID);
    }

    // 此处创建一个StreamNode,加入到streamNodes集合中
    StreamNode vertex = new StreamNode(
        vertexID,
        slotSharingGroup,
        coLocationGroup,
        operatorFactory,
        operatorName,
        new ArrayList<OutputSelector<?>>(),
        vertexClass);

    streamNodes.put(vertexID, vertex);

    return vertex;
}

transformSource方法的逻辑相对简单。在StreamGraph中增加了一个节点,还有指定了stream的sources。下面我们再研究下用的比较多的transformOneInputTransform方法。
OneInputTransformation具有一个Input,指向它前一个transformation。如此可以形成一种链表结构,如下所示:

OneInputTransformation结构

transformOneInputTransform代码如下所示:

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

    // 先处理此Transformation的input transformation
    Collection<Integer> inputIds = transform(transform.getInput());

    // the recursive call might have already transformed this
    // 前一个递归调用中可能已经将方法入口的transformation处理过了,这里加以判断,防止重复处理
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    // 下面步骤和transformSource类似,直到增加edge
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

    streamGraph.addOperator(transform.getId(),
            slotSharingGroup,
            transform.getCoLocationGroupKey(),
            transform.getOperatorFactory(),
            transform.getInputType(),
            transform.getOutputType(),
            transform.getName());

    if (transform.getStateKeySelector() != null) {
        TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
        streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
    }

    int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
        transform.getParallelism() : executionConfig.getParallelism();
    streamGraph.setParallelism(transform.getId(), parallelism);
    streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

    // 这一步是关键,添加一个edge对象,将此oneInputTransformation转换成的vertex和input transformation转换为的vertex连接起来
    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }

    return Collections.singleton(transform.getId());
}

继续跟踪streamGraph.addEdge方法。addEdge方法又调用了addEdgeInternal方法,如下所示:

private void addEdgeInternal(Integer upStreamVertexID,
        Integer downStreamVertexID,
        int typeNumber,
        StreamPartitioner<?> partitioner,
        List<String> outputNames,
        OutputTag outputTag,
        ShuffleMode shuffleMode) {

    // 稍后分析这些
    if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
        if (outputTag == null) {
            outputTag = virtualSideOutputNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
    } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
        if (outputNames.isEmpty()) {
            // selections that happen downstream override earlier selections
            outputNames = virtualSelectNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
        if (partitioner == null) {
            partitioner = virtualPartitionNodes.get(virtualId).f1;
        }
        shuffleMode = virtualPartitionNodes.get(virtualId).f2;
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else {
        // 获取上游和下游的节点
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);

        // If no partitioner was specified and the parallelism of upstream and downstream
        // operator matches use forward partitioning, use rebalance otherwise.
        // 分区器设置,后面说明
        if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner = new ForwardPartitioner<Object>();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner<Object>();
        }

        if (partitioner instanceof ForwardPartitioner) {
            if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                throw new UnsupportedOperationException("Forward partitioning does not allow " +
                        "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                        ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                        " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
            }
        }

        if (shuffleMode == null) {
            shuffleMode = ShuffleMode.UNDEFINED;
        }

        // 创建一个新的StreamEdge
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);

        // 连接刚刚创建的edge到上下游节点
        getStreamNode(edge.getSourceId()).addOutEdge(edge);
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}

在方法开始我们看到了virtualSideOutputNodesvirtualSelectNodesvirtualPartitionNodes的处理逻辑。这几类transformation会被处理为虚拟节点。什么是虚拟节点呢?我们发现sideOutput,select和分区操作不需要用户传入自定义的处理逻辑,即userFunction。这些类型的变换会被处理成虚拟节点。虚拟节点严格来说不是StreamNode类型,不包含物理转换逻辑。
虚拟节点的不会出现在StreamGraph的处理流中,在添加edge的时候如果上有节点为虚拟节点,会通过递归的方式寻找上游节点,直至找到一个非虚拟节点,再执行添加edge逻辑。虚拟节点通过内部的originalId属性,附着于非虚拟节点上。

还有Partitioner需要说明。如果没有指定partitioner,并且上下游的并行度相同,则使用ForwardPartitioner,直接推数据到本地下游的operator。如果上游和下游的并行度设置不相同,使用RebalancePartitioner。该Partitioner通过轮询的方式发送数据到下游通道。
不能在上下游并行度不同的时候使用ForwardPartitioner。否则程序会抛异常。

一张图总结

以如下程序为例:

val stream = env.fromElements("hello", "world")
stream.map((_, 0)).keyBy(0).countWindow(1).process(new ProcessWindowFunction[(String, Int), String, Tuple, GlobalWindow] {
    override def process(key: Tuple, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
        for (elem <- elements) {
            out.collect(elem._1)
        }
    }
}).print()

该程序的转换流程如图所示:


image.png

相关文章

网友评论

    本文标题:Flink 源码之StreamGraph生成

    本文链接:https://www.haomeiwen.com/subject/ezbhgctx.html