![](https://img.haomeiwen.com/i11265075/1d6251a841399720.png)
进入DAGScheduler:我们都知道程序的真正运行是在action算子时,action算子会先进入连接簇SparkContext,并进入SaprkContext的runJob方法,该方法就调用了DAGScheduler的runJob方法,真正交给了DAGS进行处理。
处理过程:DAGS会在SubmitJob中验证相关信息并调用eventProcessLoop.post来真正提交任务。eventProcessLoop会负责把事件都加入一个安全线程队列,并根据提交类型调用提交Job的方法。处理流程:分化Stage 提交Stage
提交步骤:通过submitStage把需要发送的stages从子到父全部遍历出,然后通过submitMissingTasks提交任务。首先先计算分区,根据分区和rdd找到task最佳位置的划分算法,当每个task都找到位置后,通过广播把每个task封装成闭包发送到各个节点。然后按task指标,然后把任务和参数生成ShuffleMapTask或者ResultTask提交到各个executor上
内部实现参加两篇博客
https://blog.csdn.net/dabokele/article/details/51902617 spark1.6版本的
https://blog.csdn.net/dax1n/article/details/69787629 2.2版本
https://blog.csdn.net/ws0owws0ow/article/details/74178104 剖析最细
有所补充:
切分的算法:切分的算法是按宽窄依赖来划分,我们只需要找到一个FinalStage(最后一个分区,也是最后执行的分区,也就是action引发的分区),因为分区有parentStage属性,该属性可以找到自己依赖哪些分区的结果。那么就应该用递归进行来查询,直到找到最后一个RDD。
先通过getShuffleDependencies方法找到这个FinalStage的宽依赖就是自身的parents(parents虽然有多个,但都处于同层(一起执行无相互依赖关系),在第二篇博文有说到)。getOrCreateShuffleMapStage才是真正的创建FinalStage,在这个方法中如果已经存在该Stage会直接返回,如果是第一次创建会调用getMissingAncestorShuffleDependencies方法,该方法会调用getShuffleDependencies方法,形成递归直到找出所有分区。然后依次建立父子级关系,最后就只用返回一个FinalStage
网友评论