美文网首页Flink
Flink之用户代码生成调度层图结构

Flink之用户代码生成调度层图结构

作者: MaQingxiang | 来源:发表于2018-02-06 15:52 被阅读0次

    Flink中,由用户代码生成调度层图结构,可以分成3步走:通过Stream API编写的用户代码 -> StreamGraph -> JobGraph -> ExecutionGraph

    • StreamGraph:根据用户通过Stream API编写的代码生成的最初的图,用来表示程序的拓扑结构。
    • JobGraphStreamGraph经过算子连接等优化后生成的图,它是提交给JobManager的数据结构。
    • ExecutionGraphJobManager根据JobGraph生成的分布式执行图,是调度层最核心的数据结构。

    SocketWindowWordCount为例,其执行图的演变过程如下图所示:

    转换演示图

    详解三部曲

    Step1:通过Stream API编写的用户代码 —> StreamGraph

    本小节主要介绍Flink是如何根据用户用Stream API编写的程序,构造出一个代表拓扑结构的StreamGraph的。

    找突破口

    StreamGraph的相关代码主要在org.apache.flink.streaming.api.graph包中。主要逻辑集中在StreamGraphGenerator类,入口函数是StreamGraphGenerator.generate(env, transformations),该函数由触发程序执行的方法StreamExecutionEnvironment.execute()调用到。

    理关键点
    • 根据用户通过Stream API编写的程序,构建transformations参数。
    • 遍历transformations集合,创建StreamNodeStreamEdge,构造StreamGraph
    构建过程

    StreamGraphGenerator.generate()的一个关键参数是transformations,它是env的成员变量之一,用List<StreamTransformation<? >>来保存。其中,StreamTransformation代表了从一个或多个DataStream生成新DataStream的操作。

    DataStream上常见的transformationmapflatmapfilter等。这些transformation会构造出一棵StreamTransformation树,通过这棵树转换成StreamGraph

    DataStream上的每一个transformation都对应了一个StreamOperatorStreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。

    dataStream.map为例,用户编写的UDF(User-Defined Funtion)构造出StreamTransformation的过程如下图所示:

    StreamTransformation

    从上图可以看出,map转换,首先是将用户自定义的函数MapFunction包装到StreamMap这个StreamOperator中;然后是将StreamMap包装到OneInputTransformation中,并建立与上游的关系;最后将transformation存到envtransformations集合中。

    我们看一下SocketWindowWordCount示例,其transformations树的结构如下图所示,其中符号*input指针,指向上游的transformation,从而形成了一颗transformations树。

    transformations.png

    当调用env.execute()时,会触发StreamGraphGenerator.generate(env, transformations)遍历其中的transformations集合构造出StreamGraph。自底向上递归调用每一个transformation,所以真正的处理顺序是Source->Flat Map->Hash(keyBy)->TriggerWindow->Sink

    入口:

    public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
        return new StreamGraphGenerator(env).generateInternal(transformations);
    }
    

    真正的构建过程:

    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            transform(transformation);
        }
        return streamGraph;
    }
    

    遍历transformations集合,并对其每一个StreamTransformation调用transform()方法。

    private Collection<Integer> transform(StreamTransformation<?> transform) {
        ...
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }
        ...
    }
    

    从上述代码可以看出,针对具体的某一种类型的StreamTransformation,会调用其相应的transformXXX()函数进行转换。transformXXX()首先会对transform的上游transform进行递归转换,确保上游的都已经完成了转化。然后通过addOperator()方法构造出StreamNode,通过addEdge()方法与上游的transform进行连接,构造出StreamEdge

    注意:

    对逻辑转换(partition、union等)的处理,不会生成具体的StreamNodeStreamEdge,而是通过streamGraph.addVirtualXXXNode()方法添加一个虚拟节点。当下游transform添加edge时,会把虚拟节点信息写入到StreamEdge中。

    示例讲解:

    SocketWindowWordCount示例中,Flat Map操作会被封装到OneInputTransformation类中,我们可以看一看transformOneInputTransform的实现:

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
        // 递归对transformation的直接上游transformation进行转换,获取直接上游id集合
        Collection<Integer> inputIds = transform(transform.getInput());
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
        // 构建StreamNode的入口
        streamGraph.addOperator(transform.getId(), 
                slotSharingGroup, 
                transform.getOperator(), 
                transform.getInputType(), 
                transform.getOutputType(), 
                transform.getName());
        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }
        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
        // 构建StreamEdge的入口
        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }
        return Collections.singleton(transform.getId());
    }
    

    类似的,在SocketWindowWordCount示例中,keyBy操作会被封装到PartitionTransformation类中,最后我们再以transformPartition为例看下对逻辑转换的处理。

    private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
        StreamTransformation<T> input = partition.getInput();
        List<Integer> resultIds = new ArrayList<>();
        // 递归对该transformation的直接上游transformation进行转换,获得直接上游id集合
        Collection<Integer> transformedIds = transform(input);
        for (Integer transformedId: transformedIds) {
            int virtualId = StreamTransformation.getNewNodeId();
            // 添加一个虚拟分区节点VirtualPartitionNode,不会生成StreamNode
            streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
            resultIds.add(virtualId);
        }
      return resultIds;
    }
    

    transformPartition函数的实现可以看出,对transformPartition的转换没有生成具体的StreamNodeStreamEdge,而是通过streamGraph.addVirtualPartitionNode()方法添加了一个虚拟节点。当partition的下游transform添加edge时(调用streamGraph.addEdge()),会把partition信息写入到StreamEdge中。

    最后的最后我们再看下streamGraph.addEdgeInternal()的实现:

    private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag) {
        // 当上游是sideOutput时,递归调用,并传入sideOutput信息
        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
        }
        // 当上游是select时,递归调用,并传入select信息 
        else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
            if (outputNames.isEmpty()) {
                outputNames = virtualSelectNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
        }
        // 当上游是partition时,递归调用,并传入partitioner信息 
        else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
        }
        // 不是以上逻辑转换的情况,真正构建StreamEdge 
        else {
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);
            // 没有指定partitioner时,会为其选择forward或者rebalance分区
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner<Object>();
            }
            // 创建StreamEdge,并将该StreamEdge添加到上游的输出,下游的输入
            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }
    

    一句话总结:
    首先,根据用户通过Stream API自定义UDF编写的程序,即在DataStream上做的一系列转换(map、shufflewindow等),我们可以得到StreamTransformation集合。然后通过调用streamGraphGenerator.generate(env, transformations),遍历transformations集合,并对其每一个StreamTransformation调用transform()方法,构造出StreamNode,并通过StreamEdge与上游的transformation进行连接,此处需要特别注意对逻辑转换(partition等)的处理,最后构造出StreamGraph

    Step2:StreamGraph —> JobGraph

    本小节主要介绍Flink是如何将StreamGraph转换成JobGraph的。该转换的关键在于,将多个符合条件的StreamNode节点chain在一起作为一个JobVertex节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

    找突破口

    JobGraph的相关数据结构主要在flink-runtime模块下的org.apache.flink.runtime.jobgraph包中。构造JobGraph的代码主要集中在StreamingJobGraphGenerator类中,开启构建之路的入口函数是StreamingJobGraphGenerator(streamGraph).createJobGraph()

    理关键点

    判断算子chain,合并创建JobVertex,并生成JobEdgeJobVertexJobEdge之间通过创建IntermediateDataSet来连接。可以简单分为3个关键点:

    • chain的判断
    • 生成JobVertex
    • 创建JobEdgeIntermediateDataSet
    构建过程

    入口:

    private JobGraph createJobGraph() {
    
        ...
        setChaining();  //递归创建JobVertex、JobEdge、IntermediateDataSet,用以构建JobGraph
        ...
        return jobGraph;
    }
    
    

    真正的构建过程:

    void setChaining(){
        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
            createChain(sourceNodeId, sourceNodeId, ...);
        }
    }
    
    
    private List<StreamEdge> createChain(Integer startNodeId, Integer currentNodeId, ...){
        ...
        // 将当前节点的出边分为两类:chainableOutputs和nonChainableOutputs
        for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }
        // 分别遍历chainableOutputs和nonChainableOutputs,递归调用自身方法creatChain()
        // 并将nonChainableOutputs的边或者chainableOutputs调用createChain()的返回值添加到transitiveOutEdges中
        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), ...));
        }
        for (StreamEdge nonChainable : nonChainableOutputs) {
            transitiveOutEdges.add(nonChainable);
            createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), ...);
        }
        // 如果当前节点是起始节点,则直接创建JobVertex,否则返回一个空的SteamConfig
        StreamConfig config = currentNodeId.equals(startNodeId)
            ? createJobVertex(startNodeId, ...) : new StreamConfig(new Configuration());
        // 将StreamNode中的配置信息序列化到StreamConfig中
        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
        // 再次判断,如果是chain的起始节点,执行connect()方法,创建JobEdge和IntermediateDataSet;否则将当前节点的StreamConfig添加到chainedConfig中
        if (currentNodeId.equals(startNodeId)) {
            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }
        } else {
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }
        ...
        return transitiveOutEdges;
    }
    

    setChaining()会依次对source调用createChain()方法,该方法会递归调用其下游节点,从而构建出node chainscreateChain()会分析当前节点的出边,根据Operator Chains中的条件进行判断isChainable(),并将出边分成两类:chainalbeOutputsnoChainableOutputs,接着分别递归调用自身方法。之后会将StreamNode中的配置信息序列化到StreamConfig中。如果当前不是chain中的子节点,则会构建JobVertexJobEdge相连。如果是chain中的子节点,则会将StreamConfig添加到该chainchainedConfigs集合中。

    示例讲解:

    同样地,以SocketWindowWordCount为例,我们分析下其创建过程:

    StreamGraph

    如上图所示,我们先给4StreamNode节点进行编号,Source1表示,Flat Map2表示,Trigger Window3表示,Sink4表示;相应地,3StreamEdge则分别用1->22->33->4表示。

    递归调用过程如下:

    • 递归始于Source,调用createChain(1, 1),当前节点1的出边为1->2,不可chain,将边1->2直接加入transitiveOutEdges
    • 然后递归调用createChain(2, 2),当前节点2的出边为2->3,同样的,不可chain,将边2->3直接加入transitiveOutEdges
    • 继续递归调用createChain(3, 3),当前节点3的出边为3->4,要注意了,可chain,等着将下游createChain()的返回值加入transitiveOutEdges
    • 此处递归调用createChain(3, 4),当前节点4没有出边,递归终止。

    递归结束条件:

    • 当前节点不再有出边集合,即streamGraph.getStreamNode(currentId).getOutEdges()为空
    • 当前节点已经转换完成,即builtVertices.contains(startNodeId)false

    递归调用过程中各种操作以及变量情况一览表如下:

    creatChain() getOutEdges() chainAble nonChainAble transitiveOutEdges JobVertex connect()
    (1, 1) 1->2 1->2 1->2 JobVertex Y
    (2, 2) 2->3 2->3 2->3 JobVertex Y
    (3, 3) 3->4 3->4 JobVertex Y
    (3, 4) StreamConfig N

    关键方法分析:

    isChainable(),用来判断StreamNode chain,一共有9个条件:

    • 下游节点的入边为1
    • StreamEdge的下游节点对应的算子不为null
    • StreamEdge的上游节点对应的算子不为null
    • StreamEdge的上下游节点拥有相同的slotSharingGroup,默认都是default
    • 下游算子的连接策略为ALWAYS
    • 上游算子的连接策略为ALWAYS或者HEAD
    • StreamEdge的分区类型为ForwardPartitioner
    • 上下游节点的并行度一致
    • 当前StreamGraph允许做chain

    createJobVertex(),用来创建JobVertex节点,并返回StreamConfig

    createJobVertex()传入的参数为StreamNode。首先会通过new JobVertex()构造出JobVertex节点,然后通过JobVertex.setInvokableClass(streamNode.getJobVertexClass())设置运行时执行类,再通过jobVertex.setParallelism(parallelism)设置并行度,最后返回StreamConfig

    connect(),用来创建JobEdgeIntermediateDataSet,连接上下游JobVertex节点。

    遍历transitiveOutEdges,并将每一条StreamEdge边作为参数传入connect( )函数中。接下来就是依据StreamEdge得到上下游JobVertex节点;然后,通过StreamEdge.getPartitioner()方法得到StreamPartitioner属性,对于ForwardPartitionerRescalePartitioner两种分区方式建立DistributionPattern.POINTWISE类型的JobEdgeIntermediateDataSet,而其他的分区方式则是DistributionPattern.ALL_TO_ALL类型。至此已经建立好上下游JobVertex节点间的联系。

    一句话总结:

    首先,通过streamGraph.getSourceIDs()拿到source节点集合,紧接着依次从source节点开始遍历,判断StreamNode Chain,递归创建JobVertex,所以,其真正的处理顺序其实是从sink开始的。然后通过connect()遍历当前节点的物理出边transitiveOutEdges集合,创建JobEdge,建立当前节点与下游节点的联系,即JobVertexIntermediateDataSet之间。

    Step3:JobGraph —> ExecutionGraph

    本小节主要介绍Flink是如何将JobGraph转换成ExecutionGraph的。简单来说,就是并行化JobGraph,为调度做好准备。

    找突破口

    ExecutionGraph的相关数据结构主要在org.apache.flink.runtime.executiongraph包中,构造ExecutionGraph的代码集中在ExecutionGraphBuilder类和ExecutionGraph类中,入口函数是ExecutionGraphBuilder.buildGraph(executionGraph, jobGraph, ...)

    理关键点
    • 客户端提交JobGraphJobManager
    • 构建ExecutionGraph对象
      • JobGraph进行拓扑排序,得到sortedTopology顶点集合
      • JobVertex封装成ExecutionJobVertex
      • ExecutionVertex节点通过ExecutionEdge连接起来
    构建过程

    1、JobClient提交JobGraphJobManager

    一个程序的JobGraph真正被提交始于对JobClientsubmitJobAndWait()方法的调用,而且submitJobAndWait()方法会触发基于AkkaActor之间的消息通信。JobClient在这其中起到了“桥接”的作用,它连接了同步的方法调用和异步的消息通信。

    submitJobAndWait()方法中,首先会创建一个JobClientActorActorRef,并向其发送一个包含JobGraph实例的SubmitJobAndWait消息。该SubmitJobAndWait消息被JobClientActor接收后,调用trySubmitJob()方法触发真正的提交动作,即通过jobManager.tell( )的方式给JobManager Actor发送封装JobGraphSubmitJob消息。随后,JobManager Actor会接收到来自JobClientActor的该SubmitJob消息,进而触发submitJob()方法。

    由此可见,一个JobGraph从提交开始会经过多个对象层层递交,各个对象之间的交互关系如下图所示:

    Actor交互

    2、构建ExecutionGraph对象

    入口:
    JobManager作为Actor,在handleMessage()方法中,针对SubmitJob消息调用submitJob()方法。

    private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
        ...
        executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, jobGraph, ...)
        ...
    }
    

    真正的构建过程:

    public static ExecutionGraph buildGraph(){
        ...
        //对JobGraph中的JobVertex节点进行拓扑排序,得到List<JobVertex>
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        executionGraph.attachJobGraph(sortedTopology);  //构建ExecutionGraph的核心方法
        ...
    }
    

    由上可知,attachJobGraph()方法是构建ExecutionGraph图结构的核心方法。

    public void attachJobGraph(List<JobVertex> topologiallySorted){
        ...
        for (JobVertex jobVertex : topologiallySorted) {
            ...
            // create the execution job vertex and attach it to the graph
            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
            ejv.connectToPredecessors(this.intermediateResults);
            ...
        }
        ...
    }
    

    下面详细分析下,attachJobGraph()方法主要完成的两件事情:

    • JobVertex封装成ExecutionJobVertex
    • 把节点通过ExecutionEdge连接

    关键方法分析:

    new ExecutionJobVertex()方法,用来将一个个JobVertex封装成ExecutionJobVertex,并依次创建ExecutionVertexExecutionIntermediateResultIntermediateResultPartition,用于丰富ExecutionGraph

    ExecutionJobVertex的构造函数中,首先是依据对应的JobVertex的并发度,生成对应个数的ExecutionVertex。其中,一个ExecutionVertex代表着一个ExecutionJobVertex的并发子task。然后是将原来JobVertex的中间结果IntermediateDataSet转化为ExecutionGraph中的IntermediateResult

    类似的,ExecutionVertex的构造函数中,首先会创建IntermediateResultPartition,并通过IntermediateResult.setPartition( )建立IntermediateResultIntermediateResultPartition之间的关系;然后生成Execution,并配置资源相关。

    新创建的ExecutionJobVertex调用ejv.connectToPredecessor()方法,按照不同的分发策略连接上游,其参数为上游生成的IntermediateResult集合。其中,根据JobEdge中两种不同的DistributionPattern属性,分别调用connectPointWise()或者connectAllToAll( )方法,创建ExecutionEdge,将ExecutionVertex和上游的IntermediateResultPartition连接起来。

    其中,SocketWindowWordCount示例中,就是采用了connectAllToAll()的方式建立与上游的关系。

    接下来,我们详细介绍下connectPointWise()方法的实现,即DistributionPattern.POINTWISE策略,该策略用来连接当前ExecutionVertex与上游的IntermediateResultPartition。首先,获取上游IntermediateResultpartition数,用numSources表示,以及此ExecutionJobVertex的并发度,用parallelism表示;然后,根据其并行度的不同,分别创建ExecutionEdge。共分3种情况:

    (1) 如果并发数等于partition数,则一对一进行连接。如下图所示:
    numSources == parallelism

    OneToOne

    (2) 如果并发数大于partition数,则一对多进行连接。如下图所示:
    numSources < parallelism,且parallelism % numSources == 0

    OneToMany-1
    numSources < parallelism,且parallelism % numSources != 0
    OneToMany-2

    (3) 如果并发数小于partition数,则多对一进行连接。如下图所示:
    numSources > parallelism,且numSources % parallelism == 0

    ManyToOne-1
    numSources > parallelism,且numSources % parallelism != 0
    ManyToOne-2

    一句话总结:

    JobGraph按照拓扑排序后得到一个JobVertex集合,遍历该JobVertex集合,即从source开始,将JobVertex封装成ExecutionJobVertex,并依次创建ExecutionVertexExecutionIntermediateResultIntermediateResultPartition。然后通过ejv.connectToPredecessor()方法,创建ExecutionEdge,建立当前节点与其上游节点之间的联系,即连接ExecutionVertexIntermediateResultPartition

    终章

    构建好ExecutionGraph,接下来会基于ExecutionGraph触发Job的调度,申请Slot,真正的部署任务,这是Task被执行的前提:

    if (leaderElectionService.hasLeadership) {
        log.info(s"Scheduling job $jobId ($jobName).")
        executionGraph.scheduleForExecution()  // 将生成好的ExecutionGraph进行调度
    } else {
        self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
        log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
                  "this. I am not scheduling the job for execution.")
    }
    

    下一篇文章将介绍FlinkSchedule机制,敬请期待~

    相关文章

      网友评论

        本文标题:Flink之用户代码生成调度层图结构

        本文链接:https://www.haomeiwen.com/subject/fcnizxtx.html