美文网首页玩转大数据
Flink 源码之JobGraph生成

Flink 源码之JobGraph生成

作者: AlienPaul | 来源:发表于2019-12-09 15:09 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    JobGraph

    相比StreamGraph,JobGraph在生成的时候做出了一项优化:将尽可能多的operator组合到同一个task中,形成operator chain。这样以来,同一个chain中的operator运行在同一个线程中,可以显著降低线程切换的性能开销,并且能增大吞吐量和降低延迟。

    Operator Chain

    入口方法

    StreamGraph的getJobGraph方法

    @Override
    public JobGraph getJobGraph(@Nullable JobID jobID) {
        return StreamingJobGraphGenerator.createJobGraph(this, jobID);
    }
    

    生成JobGraph的逻辑在StreamingJobGraphGenerator类中

    StreamingJobGraphGenerator的createJobGraph方法如下所示:

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }
    

    继续跟踪,发现创建JobGraph的主要逻辑如下所示:

    private JobGraph createJobGraph() {
        // 进行一些校验工作
        preValidate();
    
        // make sure that all vertices start immediately
        // 设置JobGraph的调度模式
        jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    
        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }
    
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    
        // 这里是重点,JobGraph的顶点和边在这个方法中创建,并且尝试将尽可能多的StreamNode聚合在一个JobGraph节点中。聚合条件稍后分析
        setChaining(hashes, legacyHashes, chainedOperatorHashes);
    
        // 设置物理边界
        setPhysicalEdges();
    
        // 设置slot共享和coLocation。同一个coLocationGroup的task需要在同一个slot中运行
        setSlotSharingAndCoLocation();
    
        // 配置检查点
        configureCheckpointing();
    
        JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
    
        // set the ExecutionConfig last when it has been finalized
        try {
            // 设置运行时配置
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                    "This indicates that non-serializable types (like custom serializers) were registered");
        }
    
        return jobGraph;
    }
    

    其中最为重要的是setChaining方法。该方法为StreamGraph中的每个source节点生成Job Vertex(chain)。

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
            createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
        }
    }
    

    Chain的概念:JobGraph最为重要的优化方式为创建OperatorChain,可以尽可能的多整合一些操作在同一个节点中完成,避免不必要的线程切换和网络通信。

    createChain方法的主要逻辑:

    1. 如果stream具有多个sources,遍历每一个sources,调用createChain方法。
    2. createChain方法的两个参数startNodeId和currentNodeId,如果这两个参数形同,意味着一个新chain的创建。如果这两个参数不相同,则将startNode和currentNode构造在同一个chain中。
    3. 使用一个变量builtVertices保证各个StreamNode没有被重复处理。
    4. 处理流程将各个节点的出边(out edge)分类。分类的依据为isChainable函数。
    5. 出边分为3类,可以被chain和不可以被chain的,还有一种(transitiveOutEdges)是在递归调用createChain的时候加入,目的是存放整个chain所有的出边(在构造chain的时候,遇到一个无法被chain的节点,则意味着该chain已经结束,这个无法被chain的StreamEdge就是这个chain的出边)。
    6. createChain方法会递归调用。如果某个StreamNode的出边可以chain,则调用createChain方法连接这个节点(chain的起始节点)和这个节点可以被chain的出边指向的节点,一直递归到出边不可chain为止。
    7. 遇到不可chain的节点,会创建一个job vertex。
    8. 同一个chain中的start node和chain内的节点之间operator的关系在chainedOperatorHashes变量中保存,结构为Map<startNodeID, List<Tuple2<StartNodeHash, currentNodeHash>>>
    9. 每一个Stream Node(无论有没有对应的job vertex)的配置信息在config变量中。setVertexConfig方法负责设置config变量。
    10. 通过ChainedConfig变量来保存chain的起始节点和chain内各个节点配置的对应关系。ChainedConfig结构为Map<startNodeID, Map<currentNodeID, Config>>
    11. 调用connect方法将每个job vertex(chain)和下一个连接起来。比如节点A和B相连,会现在A后追加一个Intermediate DateSet,然后是Job Edge,最后连接到B节点。

    createChain代码如下所示:

    private List<StreamEdge> createChain(
            Integer startNodeId,
            Integer currentNodeId,
            Map<Integer, byte[]> hashes,
            List<Map<Integer, byte[]>> legacyHashes,
            int chainIndex,
            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    
        // builtVertices存放了已经被构建了的StreamNode ID,避免重复操作
        if (!builtVertices.contains(startNodeId)) {
    
            // 存储整个chain所有的出边
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            // 存储可以被chain的StreamEdge
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // 存储可以不可以被chain的StreamEdge
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
    
            // 获取当前处理node
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
    
            // 分类可以被chain的edge和不可被chain的edge,使用isChainable的方法判断
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }
    
            for (StreamEdge chainable : chainableOutputs) {
                // 如果是可被chain的StreamEdge,递归调用createChain
                // 注意currentNode是chainable.getTargetId()
                // 递归直到currentNode的out edge为不可chain的edge,会执行下一段for循环,不可chain的边被加入transitiveOutEdges,最终返回到递归最外层
                // 这样以来,transitiveOutEdges收集齐了整个chain所有的出边
                transitiveOutEdges.addAll(
                        createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
            }
    
            for (StreamEdge nonChainable : nonChainableOutputs) {
                // 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
                transitiveOutEdges.add(nonChainable);
                // 调用createChain,构建新的chain
                createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
            }
    
            List<Tuple2<byte[], byte[]>> operatorHashes =
                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
    
            byte[] primaryHashBytes = hashes.get(currentNodeId);
            OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
    
            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
            }
    
            // 设置chain的名字
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
            // 设置chain的最小资源
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            // 设置chain的最小资源
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
    
            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
    
            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }
    
            // 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
            StreamConfig config = currentNodeId.equals(startNodeId)
                    ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                    : new StreamConfig(new Configuration());
    
            // 设置的顶点属性到config中
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
    
            if (currentNodeId.equals(startNodeId)) {
    
                // 意味着一个新chain的开始
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
    
                // 对于每一个chain,把它和指向下一个chain的出边连接起来
                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }
    
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    
            } else {
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
    
                config.setChainIndex(chainIndex);
                // 获取到被chain的节点
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                // 关联chain内节点的配置信息到chain的起始节点上
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
    
            config.setOperatorID(currentOperatorId);
    
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;
    
        } else {
            return new ArrayList<>();
        }
    }
    

    isChainable方法,这个方法很重要。用于判断某个边两头连接的StreamNode的node是否可以组成OperatorChain。方法如下所示:

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    
        StreamOperator<?> headOperator = upStreamVertex.getOperator();
        StreamOperator<?> outOperator = downStreamVertex.getOperator();
    
        return downStreamVertex.getInEdges().size() == 1
                && outOperator != null
                && headOperator != null
                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
                && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                    headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
                && (edge.getPartitioner() instanceof ForwardPartitioner)
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled();
    }
    

    总结起来,可以chain的条件如下(都必须满足):

    1. 下游节点的前置节点有且只能有1个。
    2. 该Edge的上游和下游节点必须存在。
    3. 上游节点和下游节点位于同一个SlotSharingGroup中。
    4. 下游的chain策略为ChainingStrategy.ALWAYS。
    5. 上游的chain策略为ChainingStrategy.ALWAYS或ChainingStrategy.HEAD。
    6. 使用ForwardPartitoner及其子类。
    7. 上游和下游节点的并行度一致。
    8. chaining被启用。

    接下来是setPhysicalEdges方法。该方法负责设置job vertex的物理边界。执行步骤总结如下:

    1. 遍历physicalEdgesInOrder对象,该对象包含了所有的不可被chain的出边(在调用connect方法的时候edge被加入该集合)。
    2. physicalInEdgesInOrder结构为Map<不可chain的edge指向的下游节点,List<不可chain的edge>>
    3. 找到这些不可chain的edge指向的下游节点,设置物理边界(该节点的入边)
    private void setPhysicalEdges() {
        Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
    
        for (StreamEdge edge : physicalEdgesInOrder) {
            int target = edge.getTargetId();
    
            List<StreamEdge> inEdges = physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList<>());
    
            inEdges.add(edge);
        }
    
        for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
            int vertex = inEdges.getKey();
            List<StreamEdge> edgeList = inEdges.getValue();
    
            vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
        }
    }
    

    其余的方法对生成JobGraph过程的理解不是很重要,暂时不分析,留在以后补充。

    示例图

    JobGraph示意图

    注意StreamGraph的window和sink两个节点被chain到了一起。

    相关文章

      网友评论

        本文标题:Flink 源码之JobGraph生成

        本文链接:https://www.haomeiwen.com/subject/ziihgctx.html