-
整体的调度图
-
从Action操作到任务划分的过程
-
Stage划分和创建
先上整体的流程图
总结:采用BFS算法从最后的RDD依次找到Shuffle RDD,每次遇到Shuffle RDD的时候且Stage不存在,则新建一个Stage
具体:
handleJobSubmitted->createResultStage先调用getParentStages得到父Stage,再调用new Stage()创建自己的Stage。
getParentStages的代码如下,使用BFS算法从当前的RDD开始遍历,遇到ShuffleMapStage的时候调用getShuffleMapStage获取父Stage。
getShuffleMapStage先用registerShuffleDependencies生成Parent Stage,再调用newOrUsedStage生成自己所在的Stage。
registerShuffleDependencies生成所有祖先的Stage,其中getAncestorShuffleDependencies采用DFS算法得到所有未生成Stage的Shuffle Dependency。
-
Stage作为Tasks提交
例子:
下面的RDD操作可以划分为3个Stage
DAGScheduler从Stage3开始,递归地找到其他Stage,将还有依赖的Stage没有计算完成的Stage3加入等待队列,将没有依赖Stage的Stage1和Stage2加入运行队列,同时给TaskScheduler提交Task运行。等待这些Tasks运行完成的时候,再运行Stage3。
源码解析过程
-
总结
Stage的划分是根据Shuffle的次数来的,因为必须等Shuffle之前的任务全部完成才能继续。
Stage的划分和运行采用的是拓扑排序,首先找到没有依赖的Stage,将其作为任务提交,其他的Stage提交到等待队列,Task完成后检查对应的Stage的下一个Stage是否可以运行了。
网友评论