美文网首页
Flink JobGraph源码阅读与分析

Flink JobGraph源码阅读与分析

作者: ni_d58f | 来源:发表于2019-06-04 23:18 被阅读0次

    1. 主要内容

    本文主要是将用户写的java程序如何生成Flink JobGraph的过程与逻辑追踪了一下,欢迎有兴趣的读者一起探讨与交流

    2. 用户程序

    public class WindowWordCount {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Tuple2<String, Integer>> dataStream = env
                    .socketTextStream("localhost", 9999)
                    .flatMap(new Splitter())
                    .keyBy(0)
                    .timeWindow(Time.seconds(5))
                    .sum(1);
    
            dataStream.print();
    
            env.execute("Window WordCount");
        }
    
        public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String word: sentence.split(" ")) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }
    
    }
    

    现在开始追踪代码

    • StreamExecutionEnvironment.getExecutionEnvironment()的代码逻辑如下
    public static StreamExecutionEnvironment getExecutionEnvironment() {
                   //在streaming 环境中,这个一直是null
           if (contextEnvironmentFactory != null) {
               return contextEnvironmentFactory.createExecutionEnvironment();
           }
    
           // because the streaming project depends on "flink-clients" (and not the other way around)
           // we currently need to intercept the data set environment and create a dependent stream env.
           // this should be fixed once we rework the project dependencies
           // 肯定会走到这里,在之前的分享中提到,这里会调用ContextEnvironmentFactory.ContextEnvironmentFactory,即 ContextEnvironment
           ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
           if (env instanceof ContextEnvironment) {
               return new StreamContextEnvironment((ContextEnvironment) env);
           } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
               return new StreamPlanEnvironment(env);
           } else {
               return createLocalEnvironment();
           }
       }
    

    上述代码最终会返回StreamContextEnvironment((ContextEnvironment) env), 接下来env通过socketTextStream()方法创建DataStream, 在说明示例中的几个Operator前,先阐述一下DataStream类之间继承关系, 如下图:

    image.png

    socketTextStream创建的为DataStreamSource,除了这个之外,其它比较重要的DataStream类为KeyedStream, 这个是调用了keyBy()API生成的DataStream, 而常见的map()filter则生成SingleOutputSteamOperator

    回到socketTextStream(), 代码为:

        public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
            return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
                    "Socket Stream");
        }
    

    最后会调用:

        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
    
            if (typeInfo == null) {
                if (function instanceof ResultTypeQueryable) {
                    typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
                } else {
                    try {
                        typeInfo = TypeExtractor.createTypeInfo(
                                SourceFunction.class,
                                function.getClass(), 0, null, null);
                    } catch (final InvalidTypesException e) {
                        typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
                    }
                }
            }
    
            boolean isParallel = function instanceof ParallelSourceFunction;
    
            clean(function);
    
            final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
            return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
        }
    

    其中的参数function为SocketTextStreamFunction() 这个function会读取socket的内容生成StreamSource。而以上代码的逻辑如下:

    1. 首先提取function的type, 根据后面的代码逻辑,SocketTextStreamFunction会得到TypeInformation<String>
    2. 清除闭包field
    3. 构造SourceOperator. 其中DataStreamSource的构造函数如下
        public DataStreamSource(StreamExecutionEnvironment environment,
                TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
                boolean isParallel, String sourceName) {
            super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
    
            this.isParallel = isParallel;
            if (!isParallel) {
                setParallelism(1);
            }
        }
    

    在flink中StreamOperator与Transformation类之间继承与实现关系如下:

    StreamOperator Transformation

    后面的.flatMap均为DataStream提供的方法, 代码为

        public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    
            TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                    getType(), Utils.getCallLocationName(), true);
    
            return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
    
        }
    
        public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    
            // read the output type of the input Transform to coax out errors about MissingTypeInfo
            transformation.getOutputType();
    
            OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    operator,
                    outTypeInfo,
                    environment.getParallelism());
    
            @SuppressWarnings({ "unchecked", "rawtypes" })
            SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    
            getExecutionEnvironment().addOperator(resultTransform);
    
            return returnStream;
        }
    

    可以看到整体逻辑为:

    1. 用户的function 构造了StreamOperator, StreamOperator结构如下:


      image.png
    2. 通过输入的transformation与StreamOperation构造当前的当前的Transformation


      image.png
    1. 通过当前的Transformation构造DataStream

    2. 将当前Transformation放入当前StreamEnvironment的中, 后面生成执行计划时会用这个数据结构

    后面的几个operator如KeyBy, Filter 其DataStream生成逻辑类似,感兴趣的读者可以自行去查看

    最后总结一下DataStream、StreamOperator、StreamTransformation之间的调用关系:

    DataSteam、StreamOperator、StreamTransformation关系

    3. StreamGraph

    StreamContextEnvironment通过execute开始整个任务的执行, 主要代码如下:

    public JobExecutionResult execute(String jobName) throws Exception {
                    ...
            StreamGraph streamGraph = this.getStreamGraph();
                    ...
             return ctx
                    .getClient()
                    .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
                    .getJobExecutionResult();
    
        }
    

    其中getStreamGraph()是获得StreamGraph主要处理函数,其核心代码为:

        public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
            //StreamGraphGenerator的构造函数将env传过去了,主要是使用了env中一些配置和对应的transformations
            return new StreamGraphGenerator(env).generateInternal(transformations);
        }
    

    参数transformations是整个job生成的transformation集合, 每调用一个API 算子就会生成一个或多个transformations。 紧接着调用generateInternal(), 其代码如下:

    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
            for (StreamTransformation<?> transformation: transformations) {
                //遍历每一个transformation并处理
                transform(transformation);
            }
            return streamGraph;
        }
    

    generateInternal会对每一个transformation执行transform()方法,该方法的核心逻辑为:

    ...
            Collection<Integer> transformedIds;
            if (transform instanceof OneInputTransformation<?, ?>) {
                transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
            } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
            } else if (transform instanceof SourceTransformation<?>) {
                transformedIds = transformSource((SourceTransformation<?>) transform);
            } else if (transform instanceof SinkTransformation<?>) {
                transformedIds = transformSink((SinkTransformation<?>) transform);
            } else if (transform instanceof UnionTransformation<?>) {
                transformedIds = transformUnion((UnionTransformation<?>) transform);
            } else if (transform instanceof SplitTransformation<?>) {
                transformedIds = transformSplit((SplitTransformation<?>) transform);
            } else if (transform instanceof SelectTransformation<?>) {
                transformedIds = transformSelect((SelectTransformation<?>) transform);
            } else if (transform instanceof FeedbackTransformation<?>) {
                transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
            } else if (transform instanceof CoFeedbackTransformation<?>) {
                transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
            } else if (transform instanceof PartitionTransformation<?>) {
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
            } else if (transform instanceof SideOutputTransformation<?>) {
                transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
            } else {
                throw new IllegalStateException("Unknown transformation: " + transform);
            }
            return transformedIds
    ...
    

    上面的逻辑是判断对应的transformation类型,生成应的Map<transformation, List<integer>>, List<integer>的作用是什么呢? 比如说同一个DataStream接多个Sink时,JobGraph会依据id来来获取source, 此时List的size = 1, 具体的会在JobGraph用到。现在以transformOneInputTransform()例进行代码说明

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
             //处理这个Transformation的Input
            Collection<Integer> inputIds = transform(transform.getInput());
    
            ...
          //Slot Group 会在后会给Task分配Slot中用到,用于隔离不的Task在不同的Slot中,此处先不管
            String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
          //向StreamGraph中添加StreamNode及StreamEdge
            streamGraph.addOperator(transform.getId(),
                    slotSharingGroup,
                    transform.getCoLocationGroupKey(),
                    transform.getOperatorFactory(),
                    transform.getInputType(),
                    transform.getOutputType(),
                    transform.getName());
            ...       
            // 添加输入输出关系的edge, 常规的opeartor只是添加一个连接input Node与本node的边,特殊的streamNode
           // 如split/select 过程稍微复杂一点,下面有针对split/select说明
            for (Integer inputId: inputIds) {
                streamGraph.addEdge(inputId, transform.getId(), 0);
            }
    
            return Collections.singleton(transform.getId());
        }
    

    以上为transformOneInputTransform处理逻辑,其它的Transformation 类似,可以发现StreamGraph中记录了两个最重要的部分: Transformation及连接Transformation的Edge, 后面生成JobGraph主要就是用这个两个数据结构

    查看StreamGraph代码可以知道StreamGraph中主要数据结构有

    • StreamNode主要封装transformation及transformation中的元数据
    • StreamEdge主要用于串联StreamNode的依赖关系
      其组织结构如下:
    StreamGraph StreamNode

    StreamNode通过StreamEdge相连,一个StreamNode可能有多个输入Edge,多个输出Edge,比如说某个Operator有多个输出,多个输入。而StreamNode中有以下关键的数据结构:

    • InputEdges和OutputEdges 输入与输出edges。
    • parallelism 结点并行度,每个结点可以单独设置,在生成ExecutionGraph时会用到这个parallelism,如果不设置,默认值为其输入的并行度。
    • SlotSharingGroup SlotGroup会影响生成的Task对Slot的分配策略。后面会专门针对这个进行说明
    • jobVertexClass 这是Flink Job生成ExecutionGraph后对应Task执行入口函数,每一个Execution对应一个jobVertexClass, 其基类为AbstractInvokable,组织结构如下:


      StreamNode对应的Task类

    当ExecutionGraph在TaskManager执行的时候,执行的入口就在上述的类中, 比如说初始化Task、Checkpoint等等

    • id, 这个id其实是Transformation对应的id,通过id与Transformation 建立一一映射关系

    在StreamGraph中还有一类特殊的StreamNode:虚节点。虚结点在StreamAPI中对应的操作如'DataStream.split(xxx).select(xxx)', 针对这些特殊的结点, flink 做了特殊处理。

    Split和Select处理

    假如一个DataStream有Split与Select这两个operator, 其组织结构为streamA.split().select(), 那么在对应的StreamGraph图中,这一部分关系为:


    split & select

    在翻译Split operator的时候,会生成一个虚拟的StreamNode B, 并把selector放入StreamNode B的outputSelectors中(数据结构为List<OutputSelector>), Input 为StreamNode A, 而在翻译
    Select的时候,首先翻译其输入VirtualNode B, 在翻译VirtualNode B时会在StreamGraph的virtualSelectNodes的Map中添加虚拟StreamNode与虚拟StreamNode输入的映射关系,Map的key为VirtualNode B id, 值为Tuple2<>,分别为StreamNode A id, SelectNames,即表示: 虚拟StreamNode的输入为StreamNode A并通过SelectNames从A中获取SplitStream。

    问题来了,那边的关系呢? 这需要在翻译下游StreamNode才会确定,在翻译OtherNode时,添加边的逻辑请看上面代码transformOneInputTransform, input 为虚拟StreamNode, 在添加边的逻辑中有

    private void addEdgeInternal(Integer upStreamVertexID,
                Integer downStreamVertexID,
                int typeNumber,
                StreamPartitioner<?> partitioner,
                List<String> outputNames,
                OutputTag outputTag) {
    
            if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
                if (outputTag == null) {
                    outputTag = virtualSideOutputNodes.get(virtualId).f1;
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
            } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
                //第一次进来进入这里,因为upStreamVertexID是一个虚拟的StreamNode
                int virtualId = upStreamVertexID;
                //得到SelectNode的InputNode id, 即StreamNode A id
                upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
                if (outputNames.isEmpty()) {
                    // selections that happen downstream override earlier selections
                    outputNames = virtualSelectNodes.get(virtualId).f1;
                }
                 //会再一次调addEdgeInternal, 输入为 StreamNode A
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
            } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
                if (partitioner == null) {
                    partitioner = virtualPartitionNodes.get(virtualId).f1;
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
            } else {
                  //第二次进入走到这里
                StreamNode upstreamNode = getStreamNode(upStreamVertexID);
                StreamNode downstreamNode = getStreamNode(downStreamVertexID);
    
                 ...
                 // 构造StreamEdge, 在split/select 模型中,edge会保存outputNames, 而upstreamNode 记录Selector函数
                StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
    
                getStreamNode(edge.getSourceId()).addOutEdge(edge);
                getStreamNode(edge.getTargetId()).addInEdge(edge);
            }
    

    FeedBack、Partition、SideOutput

    处理基本与Split/Select类似,请参考以上内容

    最终生成一个StreamGraph,其包括所有StreamNode与StreamEdge, 通过这些StreamNodes、StreamEdges及每一个Nodes的配置(如并行度、SlotGroup等)来生成一个完整的JobGraph

    JobGraph

    得到StreamGraph后, 现在我们看一下如何生成JobGraph, 其调用链路为 ClusterClient#run() --> ClusterClient#getJobGraph() --> StreamingPlan#getJobGraph() --> StreamGraph#getJobGraph --> StreamingJobGraphGenerator#createJobGraph() --> StreamingJobGraphGenerator#createJobGraph()

    主要逻辑在 createJobGraph(), 代码如下:

    private JobGraph createJobGraph() {
    
            // make sure that all vertices start immediately
            jobGraph.setScheduleMode(ScheduleMode.EAGER);
    
            // Generate deterministic hashes for the nodes in order to identify them across
            // submission iff they didn't change.
            Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    
            // Generate legacy version hashes for backwards compatibility
            List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
            for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
                legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
            }
    
            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
            //见下面详解
            setChaining(hashes, legacyHashes, chainedOperatorHashes);
            //根据setChaining得到的结果设置物理边
            setPhysicalEdges();
            //设置jobGraph的SlotSharingGroup和CoLocationGroup
            setSlotSharingAndCoLocation();
            /*
              设置jobGraph的各个 JobVertex 的checkpoint 信息
              比如说source JobVertex 需要trigger checkpoint
              所有的JobVertex需要commit和ack checkpoint
            */
            configureCheckpointing();
    
            JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);
    
            // set the ExecutionConfig last when it has been finalized
            try {
                jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
            }
            catch (IOException e) {
                throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                        "This indicates that non-serializable types (like custom serializers) were registered");
            }
    
            return jobGraph;
        }
    

    代码逻辑如下:

    1. 遍历stream graph
    2. 生成operatorChain
    3. 设置物理边
    4. 设置SlotSharing Group

    3.1 遍历

    遍历StreamGraph 会从source开始遍历求每一个StreamNode的hash码,在计算的时候,一定会确保一个StremNode的所有输入Node都已经计算过了之后才会计算当前的StreamNode

    3.2 Operator Chain

    3.2.1 opearor chain 及作用

    在StreamGraph中可以知道一个Operator对应一个StreamNode, 考虑一个日常经常遇到的问题,一个DataStream.map().filter() 这个关系中map和filter Operator会组成不同的StreamNode,最后生成Task, 如果这两个Task不在同一个Slot或在不同一个TaskManager中,数据会经过网络从map传到filter,执行性能会很差,考虑到这一点,flink引入 operator chain的概念, 一个operator chain 代表一组可以在同一个Slot执行的Operator串

    3.2.2 什么样的情况可以chain在一起

    根据源码信息,如果一个上游opeartor A与下游满足以下关系则可以串在一起

    • 下游的input只有一个即上游
    • 属于同一个SlotSharingGroup
    • 允许Chain打开
    • Partitioner 为ForwardPartitioner
    • 并行度一致
    • ChainingStrategy允许chain在起

    当然一个chain可以chain多个operator,只要连续的两个operator满足以下关系

            return downStreamVertex.getInEdges().size() == 1
                    && outOperator != null
                    && headOperator != null
                    && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                    && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
                    && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                        headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
                    && (edge.getPartitioner() instanceof ForwardPartitioner)
                    && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                    && streamGraph.isChainingEnabled();
    
    
    3.2.3 代码逻辑

    现在以一个简单的StreamGraph为例来说明生成JobGraph的流程,如图:


    简单StreamGraph图

    上面函数中setChaining函数会调用createChain函数三次
    第一次调用createChain

    
        private List<StreamEdge> createChain(
                Integer startNodeId,                 //Node A
                Integer currentNodeId,               //Node A
                Map<Integer, byte[]> hashes,         //空
                List<Map<Integer, byte[]>> legacyHashes, //size = 1
                int chainIndex,                          //chainIndex = 0
                Map<Integer, List<Tuple2<byte[], byte[]>>>  chainedOperatorHashes                       //空
    ) {
    
            if (!builtVertices.contains(startNodeId)) {
    
                List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
    
                List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
                List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
                             
                //currentNodeId为Node A, outedges为size = 1,后面的chainableOutputs size = 0, nonChainableOutputs size 为 1
                for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
                    if (isChainable(outEdge, streamGraph)) {
                        chainableOutputs.add(outEdge);
                    } else {
                        nonChainableOutputs.add(outEdge);
                    }
                }
                //chainableOutputs为空,skip
                for (StreamEdge chainable : chainableOutputs) {
                    transitiveOutEdges.addAll(
                            createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
                }
                //nonChainableOutputs不为空
                for (StreamEdge nonChainable : nonChainableOutputs) {
                    //添加Edge D
                    transitiveOutEdges.add(nonChainable);
                    //见第二次调用createChain
                    createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
                }
                /*
                    往chainedOperatorHashes添加 Node A的id, 由于在第二次和三次调用时已
                    放入Node B的id, 此处当前的值为
                    key                           value
                     NodeB Hash                          List<
                                                             Tuple2<Node C hash, Node C hash1>
                                                             Tuple2<Node B hash, Node B hash1>
                                                          >
                 */
                List<Tuple2<byte[], byte[]>> operatorHashes =
                    chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
               //得到Node A的hash值
                byte[] primaryHashBytes = hashes.get(currentNodeId);
               /* legacyHashes的size 为1, 实际上下面的逻为chainedOperatorHashes 的key为 Node A中添加 NodeA的添加Hash
               最后组织形式为:
                     Key                                            Value
                    NodeB Hash                               List<
                                                              Tuple2<Node C hash, Node C hash1>
                                                              Tuple2<Node B hash, Node B hash1>
                                                              >
                    Node A Hash                            List<<Tuple2<Node A hash, Node A hash1>
               */
                for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                    operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
                }
                /* 
                      chainedNames添加Stream Node A的name
                      执行完之后值为  
                              <3, StreamNode C>
                              <2, StreamNode B -> StreamNode C>
                              <1, StreamNode A>
               */ 
                chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
                  //后面两个基本没有用到,就直接过去了,有兴趣的读者
                chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
                chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
                  /*
                        此时config会执行下面的createJobVertex, 其主要逻辑为
                        生成JobVertex,  查看JobVertex的构造函数:
    
                       jobVertex = new JobVertex(
                        chainedNames.get(streamNodeId),
                        jobVertexId,                //根据Node A的hash
                        legacyJobVertexIds,          // List<Node A hash1>
                        chainedOperatorVertexIds,    //值为List<Node A hash> 代表这个jobVertex是一个operator chain, 其chain中只有一个StreamNode
                        userDefinedChainedOperatorVertexIds //值为List<Node A hash1>
                      );
                  */
                StreamConfig config = currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                        : new StreamConfig(new Configuration());
                 /*
                      设置StreamNode A的一些相关配置,如并行度、Checkpoint配置等,
                      逻辑比较简单,感兴趣的同学自行点进去看
                 */
                setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
                 //currentNodeId等于startNodeId, 执行当前代码块
                if (currentNodeId.equals(startNodeId)) {
                    //知道Node A是chain 的开始 
                    config.setChainStart();
                    config.setChainIndex(0);
                    config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                    config.setOutEdgesInOrder(transitiveOutEdges); //transitiveOutEdges是空
                    /*
                      outEdges 为 edges D
                    */
         
    config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
               /*
                      transitiveOutEdges size = 1,  值为edge D, 
                      整体逻辑为给JobGraph 添加物理边连接JobVertex A(StreamNode A) 和 JobVertex B(StreamNode B和StreamNode C)
                      给物理边添加Partitioner, partitioner 作用: 根据下游的并行度、Operator类型,决定如何将当前Opeartor的结果传递给下游
                      比如说 ForwardPartitioner(上游并行度与下游并行度一致)
                            RescalePartitioner(上游并行度与下游并行度不一致)
                            其它,比如说是 Agg
                       关于这一部分更为详细的说明见Flink 并行度那一节说明
                             
               */
                    for (StreamEdge edge : transitiveOutEdges) {
                        connect(startNodeId, edge);
                    }
    
                    /*
                       chainedConfigs.get(startNodeId) 的值在第二、三次调用已经计算过了
                       key                        value
                       2                           3, Node C config
                      chainedConfigs.get(startNodeId) 值为空
                   */
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    
                } else {       
                    chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
    
                    config.setChainIndex(chainIndex);
                    StreamNode node = streamGraph.getStreamNode(currentNodeId);
                    config.setOperatorName(node.getOperatorName());
                    chainedConfigs.get(startNodeId).put(currentNodeId, config);
                }
    
                config.setOperatorID(new OperatorID(primaryHashBytes));
                            
                 // chainableOutputs.isEmpty()为true, 
                if (chainableOutputs.isEmpty()) {
                    config.setChainEnd();
                }
                // transitiveOutEdges 的值为Edge D
                return transitiveOutEdges;
    
            } else {
                return new ArrayList<>();
            }
    

    第二次调用createChain

    
        private List<StreamEdge> createChain(
                Integer startNodeId,                 //Node B
                Integer currentNodeId,               //Node B
                Map<Integer, byte[]> hashes,         //空
                List<Map<Integer, byte[]>> legacyHashes, //size = 1
                int chainIndex,                          //chainIndex = 0
                Map<Integer, List<Tuple2<byte[], byte[]>>>  chainedOperatorHashes                       //空
    ) {
    
            if (!builtVertices.contains(startNodeId)) {
    
                List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
    
                List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
                List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
                             
                //currentNodeId为Node B, outedges为size = 1,后面的chainableOutputs = 1, nonChainableOutputs 为空
                for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
                    if (isChainable(outEdge, streamGraph)) {
                        chainableOutputs.add(outEdge);
                    } else {
                        nonChainableOutputs.add(outEdge);
                    }
                }
                  
                for (StreamEdge chainable : chainableOutputs) {
                    //见第三次调用createChain, createChain返回值一个空List
                    transitiveOutEdges.addAll(
                            createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
                }
                //nonChainableOutputs 为空, skip
                for (StreamEdge nonChainable : nonChainableOutputs) {
                    transitiveOutEdges.add(nonChainable);
                    createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
                }
                /*
                    往chainedOperatorHashes添加 Node B的id, 由于在第三次调用时已
                    放入Node B的id, 此当前的值为
                    List<Tupe2<>> 的size = 1, 值为 Tuple2<Node C hash1, Node C hash2>         
                 */
                List<Tuple2<byte[], byte[]>> operatorHashes =
                    chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
               // 得到Node B的hash值
                byte[] primaryHashBytes = hashes.get(currentNodeId);
               /* legacyHashes的size 为1, 实际上下面的逻辑给chainedOperatorHashes 的key为 Node B中添加 NodeB的添加Hash
               最后组织形式为:
                     Key                                            Value
                    NodeB Hash                            List<
                                                              Tuple2<Node C hash, Node C hash1>
                                                              Tuple2<Node B hash, Node B hash1>
                                                              >
               */
                for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                    operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
                }
                /* 
                      chainedNames添加Stream Node C的name
                      当前值为  
                              <3, StreamNode C>
                              <2, StreamNode B -> StreamNode C>
               */ 
                chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
                  //后面两个基本没有用到,就直接过去了,有兴趣的读者
                chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
                chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
                  /*
                        此时config会执行下面的createJobVertex, 其主要逻辑为
                        生成JobVertex,  查看JobVertex的构造函数:
    
                       jobVertex = new JobVertex(
                        chainedNames.get(streamNodeId),
                        jobVertexId,                //根据Node B的hash
                        legacyJobVertexIds,          // List<Node B hash1>
                        chainedOperatorVertexIds,    //值为List<Node B hash, Node C hash> 代表这个jobVertex是一个operator chain, 其chain 中有两个stream opearor B和C
                        userDefinedChainedOperatorVertexIds //值为List<Node B hash1, Node C hash1>
                      );
                  */
                StreamConfig config = currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                        : new StreamConfig(new Configuration());
                 /*
                      设置StreamNode B的一些相关配置,如并行度、Checkpoint配置等,
                      逻辑比较简单,感兴趣的同学自行点进去看
                 */
                setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
                 //currentNodeId等于startNodeId, 执行当前代码块
                if (currentNodeId.equals(startNodeId)) {
                    //知道Node B是chain 的开始 
                    config.setChainStart();
                    config.setChainIndex(0);
                    config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                    config.setOutEdgesInOrder(transitiveOutEdges); //transitiveOutEdges是空
                    /*
                      outEdges 为 edges E
                    */
         
    config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
    
                    for (StreamEdge edge : transitiveOutEdges) {
                        connect(startNodeId, edge);
                    }
    
                    /*
                       chainedConfigs.get(startNodeId) 的值在第三次调用已经计算过了
                       key                        value
                       2                           3, Node C config
      
                   */
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    
                } else {       
                    chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
    
                    config.setChainIndex(chainIndex);
                    StreamNode node = streamGraph.getStreamNode(currentNodeId);
                    config.setOperatorName(node.getOperatorName());
                    chainedConfigs.get(startNodeId).put(currentNodeId, config);
                }
    
                config.setOperatorID(new OperatorID(primaryHashBytes));
                            
                 // chainableOutputs.isEmpty()为false, 
                if (chainableOutputs.isEmpty()) {
                    config.setChainEnd();
                }
                // transitiveOutEdges 的值为空
                return transitiveOutEdges;
    
            } else {
                return new ArrayList<>();
            }
    

    第三次调用createChain

    
        private List<StreamEdge> createChain(
                Integer startNodeId,                 //Node B
                Integer currentNodeId,               //Node C
                Map<Integer, byte[]> hashes,         //空
                List<Map<Integer, byte[]>> legacyHashes, //size = 1
                int chainIndex,                          //chainIndex = 1
                Map<Integer, List<Tuple2<byte[], byte[]>>>  chainedOperatorHashes                       //空
    ) {
    
            if (!builtVertices.contains(startNodeId)) {
    
                List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
    
                List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
                List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
                             
                //currentNodeId为Node C, outedges为空, 进而后面的chainableOutputs, nonChainableOutputs 都为空
                for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
                    if (isChainable(outEdge, streamGraph)) {
                        chainableOutputs.add(outEdge);
                    } else {
                        nonChainableOutputs.add(outEdge);
                    }
                }
    
                for (StreamEdge chainable : chainableOutputs) {
                    transitiveOutEdges.addAll(
                            createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
                }
    
                for (StreamEdge nonChainable : nonChainableOutputs) {
                    transitiveOutEdges.add(nonChainable);
                    createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
                }
                //往chainedOperatorHashes 放入Node B
                List<Tuple2<byte[], byte[]>> operatorHashes =
                    chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
               // 得到Node C的hash值
                byte[] primaryHashBytes = hashes.get(currentNodeId);
               /* legacyHashes的size 为1, 实际上下面的逻辑给chainedOperatorHashes 的key为 Node B中添加 NodeC的添加Hash
               其组织形式为:
                     Key                                            Value
                    NodeB Hash                            List<Tuple2<Node C hash1, Node C hash2>>
               */
                for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                    operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
                }
                  /* 
                      chainedNames添加Stream Node C的name
                      当前值为  
                              <3, StreamNode C>
                  */ 
                chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
                  //后面两个基本没有用到,就直接过去了,有兴趣的读者
                chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
                chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
                  //此时config会执行下面的new StreamConfig
                StreamConfig config = currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                        : new StreamConfig(new Configuration());
                 /*
                      设置StreamNode C的一些相关配置,如并行度、Checkpoint配置等,
                      逻辑比较简单,感兴趣的同学自行点进去看
                 */
                setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
                 // currentNodeId 不等于startNodeId, 执行下面那面那个代码块
                if (currentNodeId.equals(startNodeId)) {
    
                    config.setChainStart();
                    config.setChainIndex(0);
                    config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                    config.setOutEdgesInOrder(transitiveOutEdges);
                    config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
    
                    for (StreamEdge edge : transitiveOutEdges) {
                        connect(startNodeId, edge);
                    }
    
                    config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    
                } else {
                   /**
                      chainedConfigs存放的是operator chain 相关的配置。
                      chainIndex = 1
                      node = StreamNode C
                      则
                      chainedConfigs 最终内容为
                      key                        value
                      2                          3, Node C config
                   */          
                    chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
    
                    config.setChainIndex(chainIndex);
                    StreamNode node = streamGraph.getStreamNode(currentNodeId);
                    config.setOperatorName(node.getOperatorName());
                    chainedConfigs.get(startNodeId).put(currentNodeId, config);
                }
    
                config.setOperatorID(new OperatorID(primaryHashBytes));
                            
                 // chainableOutputs.isEmpty()为true, 
                if (chainableOutputs.isEmpty()) {
                    config.setChainEnd();
                }
                // transitiveOutEdges 的值为空
                return transitiveOutEdges;
    
            } else {
                return new ArrayList<>();
            }
    

    核心逻辑为: 从当前StreamNode开始,一直遍历到结点不能与其串在一起(从代码逻辑上看,StreamNode与其本身是永远可以串在一起), 记录这些能串在一起的结点,递归翻译当前结点的输出后, 然后将保存下来可以串在一起的StreamNode生成一个JobVertex, 最后将JobVertex的输出设置成之前已经翻译的输出JobVertex。

    可以发现JobGraph相对于StreamGraph的最主要区别是将一些StreamNode合并成一个JobVertex, 而JobVertex通过JobEdge(物理边)相连, 最大程度的优化了StreamGraph

    最后生成的JobGraph主体架构如下


    JobGraph

    4. JobGraph 提交步骤

    最后看一下提交Job的处理流程,最终会调用ClusterClient#submitJob(), 这个是一个抽象方法,我以MiniClusterClient为例, 会调用MiniClusterClient#requestJobResult, 关于从Client提交到JobManager过程先略过,后面会专门针对这个流程进行详解, 最终在JobManager端会调用
    JobManagerRunner#grantLeadership(), 之后的调用链为verifyJobSchedulingStatusAndStartJobManager -> startJobMaster() --> JobMasterService#start() --> JobMaster#startJobExecution() --> JobMaster#resetAndStartScheduler() --> JobMaster#startScheduling()
    终于看到任务开始调度执行了,关于这一块具体逻辑,后面会针对性的说明

    相关文章

      网友评论

          本文标题:Flink JobGraph源码阅读与分析

          本文链接:https://www.haomeiwen.com/subject/ncwrxctx.html