美文网首页
StreamGraph生成

StreamGraph生成

作者: 飞_侠 | 来源:发表于2022-03-26 19:05 被阅读0次

    例子

    如SocketWindowWordCount例子为例,分析Graph的构建过程;

    public static void main(String[] args) throws Exception {
            // get the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // get input data by connecting to the socket
            DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    
            // parse the data, group it, window it, and aggregate the counts
            DataStream<WordWithCount> windowCounts =
                    text.flatMap(
                                    new FlatMapFunction<String, WordWithCount>() {
                                        @Override
                                        public void flatMap(
                                                String value, Collector<WordWithCount> out) {
                                            for (String word : value.split("\\s")) {
                                                out.collect(new WordWithCount(word, 1L));
                                            }
                                        }
                                    })
                            .keyBy(value -> value.word)
                            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                            /**
                            * han_pf
                            * keyby,window不会往transformations list中装入transformation,reduce才会
                            */
                            .reduce(
                                    new ReduceFunction<WordWithCount>() {
                                        @Override
                                        public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                                            return new WordWithCount(a.word, a.count + b.count);
                                        }
                                    });
            // print the results with a single thread, rather than in parallel
            windowCounts.print().setParallelism(1);
    
            env.execute("Socket Window WordCount");
        }
    

    如代码,整个算子链执行流程如下图所示:
    其中在算子构成过程中,当前transformation创建时会以上游算子作为输入(input);而对于某些只是改变数据流类型的算子对应的transformation不会添加到transformations集合中,这种算子只是对流的类型作了改变,而没有具体的业务处理。在生成streamNode过程中会生成virtualPartitionNodes;如下图,只有flatMap、reduce、sink对应的transformation才会加入transformations集合中。

    image.png

    StreamGraph的生成

    如上执行execute()方法,会调用StreamExecutionEnvironment.getStreamGraph()方法,然后调用StreamGraphGenerator.generate()方法,执行生成过程.

        public JobExecutionResult execute(String jobName) throws Exception {
            Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
            /**
            * han_pf
            * 重点看
            */
            final StreamGraph streamGraph = getStreamGraph();
            streamGraph.setJobName(jobName);
            return execute(streamGraph);
        }
        public StreamGraph getStreamGraph(boolean clearTransformations) {
            final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
            if (clearTransformations) {
                transformations.clear();
            }
            return streamGraph;
        }
    

    StreamGraphGenerator

    首先,会对之前添加到transformations集合中的transformation进行遍历,并且执行转换,具体转换过程如下,会从初始化的translatorMap中根据transformation类型获取对应的translator进行算子转换。
    整个执行流程:


    image.png
    translatorMap
    static {
            @SuppressWarnings("rawtypes")
            Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
                    tmp = new HashMap<>();
            tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
            tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
            tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
            tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
            tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
            tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
            tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
            tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
            tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
            tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
            tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
            tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
            tmp.put(
                    TimestampsAndWatermarksTransformation.class,
                    new TimestampsAndWatermarksTransformationTranslator<>());
            tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
            tmp.put(
                    KeyedBroadcastStateTransformation.class,
                    new KeyedBroadcastStateTransformationTranslator<>());
            translatorMap = Collections.unmodifiableMap(tmp);
        }
    
    public StreamGraph generate() {
    
            streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
    
            streamGraph.setEnableCheckpointsAfterTasksFinish(
                    configuration.get(
                            ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
    
            shouldExecuteInBatchMode = shouldExecuteInBatchMode();
    
            configureStreamGraph(streamGraph);
    
            alreadyTransformed = new HashMap<>();
            /**
            * han_pf
            * 遍历之前设置进去的transformations
            */
            for (Transformation<?> transformation : transformations) {
                transform(transformation);
            }
    
            streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
    
            setFineGrainedGlobalStreamExchangeMode(streamGraph);
    
            for (StreamNode node : streamGraph.getStreamNodes()) {
                if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                    for (StreamEdge edge : node.getInEdges()) {
                        edge.setSupportsUnalignedCheckpoints(false);
                    }
                }
            }
    
            final StreamGraph builtStreamGraph = streamGraph;
    
            alreadyTransformed.clear();
            alreadyTransformed = null;
            streamGraph = null;
    
            return builtStreamGraph;
        }
    
    private Collection<Integer> transform(Transformation<?> transform) {
            
          //省略部分代码
            // call at least once to trigger exceptions about MissingTypeInfo
            transform.getOutputType();
    
            @SuppressWarnings("unchecked")
            final TransformationTranslator<?, Transformation<?>> translator =
                    (TransformationTranslator<?, Transformation<?>>)
                            translatorMap.get(transform.getClass());
    
            Collection<Integer> transformedIds;
            if (translator != null) {
                /**
                * han_pf
                * 根据算子类型调用不同的translator进行转换
                */
                transformedIds = translate(translator, transform);
            } else {
                transformedIds = legacyTransform(transform);
            }
    
            // need this check because the iterate transformation adds itself before
            // transforming the feedback edges
            if (!alreadyTransformed.containsKey(transform)) {
                alreadyTransformed.put(transform, transformedIds);
            }
    
            return transformedIds;
        }
    
    核心的translate(translator,transform)方法

    首先,在遍历transformations集合中的transformation时会获取当前transformation的input算子,见getParentInputs(transform.getInputs())方法,会进行递归调用transform()方法,将上游transformation转换完成。

     private Collection<Integer> translate(
                final TransformationTranslator<?, Transformation<?>> translator,
                final Transformation<?> transform) {
            checkNotNull(translator);
            checkNotNull(transform);
            /**
            * han_pf
            * 涉及递归调用当前算子 input算子的解析
            */
            final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
    
            // the recursive call might have already transformed this
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
            /**
            * han_pf
            * 判断当前算子是否设置了slotSharingGroup,默认名称是default,会以此判断是否共享slot;
             * 不同task的subtask之间可以共享slot,
            */
            final String slotSharingGroup =
                    determineSlotSharingGroup(
                            transform.getSlotSharingGroup().isPresent()
                                    ? transform.getSlotSharingGroup().get().getName()
                                    : null,
                            allInputIds.stream()
                                    .flatMap(Collection::stream)
                                    .collect(Collectors.toList()));
    
            final TransformationTranslator.Context context =
                    new ContextImpl(this, streamGraph, slotSharingGroup, configuration);
    
            return shouldExecuteInBatchMode
                    ? translator.translateForBatch(transform, context)
                    : translator.translateForStreaming(transform, context);
        }
    
       private List<Collection<Integer>> getParentInputIds(
                @Nullable final Collection<Transformation<?>> parentTransformations) {
            final List<Collection<Integer>> allInputIds = new ArrayList<>();
            if (parentTransformations == null) {
                return allInputIds;
            }
    
            for (Transformation<?> transformation : parentTransformations) {
                allInputIds.add(transform(transformation));
            }
            return allInputIds;
        }
    

    SimpleTransformationTranslator

       @Override
        public final Collection<Integer> translateForStreaming(
                final T transformation, final Context context) {
            checkNotNull(transformation);
            checkNotNull(context);
    
            final Collection<Integer> transformedIds =
                    translateForStreamingInternal(transformation, context);
    
            configure(transformation, context);
    
            return transformedIds;
        
    

    translateForStreamingInternal会调用对应类型的translator进行转化,对于类似keyby等生成的partionTransformation会调用PartitionTransformationTranslator进行解析会将这种算子生成VirtualPartitionNode,其他的如OneInputTransformationTranslator会将transformation转换成StreamNode,代码如下:

    image.png

    PartitionTransformationTranslator

    public class PartitionTransformationTranslator<OUT>
            extends SimpleTransformationTranslator<OUT, PartitionTransformation<OUT>> {
    
        @Override
        protected Collection<Integer> translateForBatchInternal(
                final PartitionTransformation<OUT> transformation, final Context context) {
            return translateInternal(transformation, context);
        }
    
        @Override
        protected Collection<Integer> translateForStreamingInternal(
                final PartitionTransformation<OUT> transformation, final Context context) {
            return translateInternal(transformation, context);
        }
    
        private Collection<Integer> translateInternal(
                final PartitionTransformation<OUT> transformation, final Context context) {
            checkNotNull(transformation);
            checkNotNull(context);
    
            final StreamGraph streamGraph = context.getStreamGraph();
    
            final List<Transformation<?>> parentTransformations = transformation.getInputs();
            checkState(
                    parentTransformations.size() == 1,
                    "Expected exactly one input transformation but found "
                            + parentTransformations.size());
            final Transformation<?> input = parentTransformations.get(0);
    
            List<Integer> resultIds = new ArrayList<>();
    
            for (Integer inputId : context.getStreamNodeIds(input)) {
                final int virtualId = Transformation.getNewNodeId();
                streamGraph.addVirtualPartitionNode(
                        inputId,
                        virtualId,
                        transformation.getPartitioner(),
                        transformation.getExchangeMode());
                resultIds.add(virtualId);
            }
            return resultIds;
        }
    }
    

    以上对于StreamGraph生成整体流程进行介绍,后续再对生成过程中的细节,如streamNode及edge的生成进行介绍。

    相关文章

      网友评论

          本文标题:StreamGraph生成

          本文链接:https://www.haomeiwen.com/subject/vvesjrtx.html