美文网首页flinkflink
Flink源码阅读(二)--- JobGraph 的生成

Flink源码阅读(二)--- JobGraph 的生成

作者: sj_91d7 | 来源:发表于2021-02-07 18:39 被阅读0次

    本文内容是基于Flink 1.9来讲解。在执行Flink任务的时候,会涉及到三个Graph,分别是StreamGraph,JobGraph,ExecutionGraph。其中StreamGraph和JobGraph是在client端生成的,ExecutionGraph是在JobMaster中执行的。

    • StreamGraph是根据用户代码生成的最原始执行图,也就是直接翻译用户逻辑得到的图
    • JobGraph是对StreamGraph进行优化,比如设置哪些算子可以chain,减少网络开销
    • ExecutionGraph是用于作业调度的执行图,对JobGraph加了并行度的概念

    本篇文章在Flink源码阅读(一)--- StreamGraph 的生成 基础上,介绍下JobGraph的生成

    1. JobVertex

    在StreamGraph中,每个operator对应一个StreamNode。在JobGraph中,JobVertex对应的是可chain起来的operator list,把一些operator chain起来,可以较少网络以及序列化和反序列化的开销,大部分情况下可以提高作业性能。

    2. JobEdge

    在StreamGraph中,StreamNode之间的连接关系使用StreamEdge表示。在JobGraph中,JobVertex之间的连接关系使用JobEdge表示。

    • JobEdge中存储了target JobVertex信息,没有source JobVertex信息
    • JobEdge中存储了source IntermediateDataSet以及source IntermediateDataSetID信息,IntermediateDataSet是edge的输入数据集。

    3. JobGraph生成入口StreamingJobGraphGenerator#createJobGraph()方法

        private JobGraph createJobGraph() {
    
            // make sure that all vertices start immediately
            jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    
            // Generate deterministic hashes for the nodes in order to identify them across
            // submission iff they didn't change.
            Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    
            // Generate legacy version hashes for backwards compatibility
            List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
            for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
                legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
            }
    
            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    
            setChaining(hashes, legacyHashes, chainedOperatorHashes);
    
            setPhysicalEdges();
    
            setSlotSharingAndCoLocation();
    
            configureCheckpointing();
    
            JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
    
            // set the ExecutionConfig last when it has been finalized
            try {
                jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
            }
            catch (IOException e) {
                throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                        "This indicates that non-serializable types (like custom serializers) were registered");
            }
    
            return jobGraph;
        }
    

    主要做的工作如下
    1.1 为streamGraph的StreamNode生成hash值(如果用户为operator指定了uid,就使用用户自定义的,否则自动生成。这个uid不管是用户指定还是自动生成,必须保证Job全局唯一性),这个值以后作为JobVertexID来唯一标识节点
    1.2 设置task chain关系,并且把可chain的operator创建一个新的JobVertex添加到JobGraph中
        1.2.1 是否可chain,代码逻辑在StreamingJobGraphGenerator.isChainable方法中
            - 下游算子的输入只有一个
            - 下游算子不为空
            - 上游算子不为空
            - 上游和下游算子的slotSharingGroup相同
            - 下游算子的ChainingStrategy为ALWAYS
            - 上游算子的ChainingStrategy为HEAD或者ALWAYS
            - 上下游算子中间的edge的Partitioner是ForwardPartitioner
            - 上下游算子的并发相同
            - streamGraph配置是可以chain的
    1.3 设置PhysicalEdges
    1.4 设置SlotSharingAndCoLocation,就是把streamGraph StreamNode的SlotSharingGroup属性设置到JobVertex中
    1.5 配置checkpoint。主要就是设置triggerVertices(所有的输入节点),commitVertices(所有的节点),ackVertices(所有的节点)
    1.6 设置执行配置信息,比如默认并发度,失败时retry次数,retry delay等

    通过createJobGraph方法,就完成了StreamGraph到JobGraph的转换。

    4. 小结

    JobGraph主要是把StreamGraph中,可以chain起来的operator进行合并,这样可以减小网络以及序列化和反序列化的开销。

    相关文章

      网友评论

        本文标题:Flink源码阅读(二)--- JobGraph 的生成

        本文链接:https://www.haomeiwen.com/subject/nmbztltx.html