spark应用程序中用户代码是基于RDD的一系列操作。这些操作是延迟执行的。在sparkContext内部作业的提交是调用DAGScheduler的提交接口。DAGScheduler是面向stage的调度,为每个JOB生成DAG,将作业拆分成具有依赖关系的多个调度阶段(通常根据shuffle来划分),每个阶段构建出一组具体的任务,然后以taskSet的形式提交给任务调度模块具体执行。DAGScheduler负责任务的逻辑调度,而taskScheduler负责作业的物理调度。包括job的提交、取消、获取执行结果、监控调度阶段的状态等。
DAGScheduler是计算作业与任务之间的依赖关系。DAGScheduler中作业的发起是通过submitJob或者RunJob开始的。DAGScheduler是在sparkContext初始化中实例化的。DAGScheduler的事件循环逻辑是通过ACTOR System实现的,内部主要是由片函数来实现对不同逻辑的处理。
DAGScheduler启动时,会创建一个DAGSchedulerEventProcessActor实例来处理各种DAGScheduler事件,主要是利用其receive偏函数实现。DAGSchedulerEventProcessActor实例会向ActorSystem注册。
a.调度阶段的拆分:当一个RDD操作触发提交操作时,DAGScheduler从RDD链的末端开始,遍历整个RDD链,换分调度阶段,并决定各个调度阶段的依赖关系。调度阶段的划分以shuffleDependency为依据。
b.调度阶段的提交:在划分阶段可以得到多个调度阶段,其中直接出发作业的调度阶段称为finalStage。DAGScheduler从finalStage生成一个实例,这两者的调度关系进一步存储在映射表中,以便完成是做一些处理工作。具体过程:首先判断该调度阶段所依赖的父调度阶段的结果是否可用,如果父调度阶段结果都可用,则提交该阶段,否则,则尝试迭代提交当前不可用的父调度阶段。在迭代的过程中所有由于依赖的调度阶段结果不可用而不能提交的调度阶段会被放入等待队列,等待提交;当某一格调度阶段完成之后,DAGScheduler会重新扫描等待队列中的调度阶段,检查他们是否还有依赖的调度阶段没有完成。如果没有则尝试提交这些调度阶段。
c.任务集的提交:调度阶段的提交最终会转换为一个任务集的提交。DAGScheduler通过taskScheduler提交任务集,这个任务集会触发taskScheduler构建一个TaskSetManager来管理这个任务集的生命周期,负责任务集内部任务的调度。DAGScheduler阶段的提交到此结束。taskScheduler会在计算资源的时候,进一步会通过TaskSetManager调度具体的任务到Executor节点上进行运算。TaskSetManager会根据动态本地性调度策略分配任务;如果上一次成功提交任务的时间很长,则会降低本地性要求,否则提高本地性的要求。
d.状态的监控:主要是taskScheduler通过一些回调函数通知DAGScheduler具体的Executor的生命状态。在DAGScheduler中是在receive偏函数中进行处理的。
e.任务结果的获取:一个具体的任务在executor中执行完毕之后需要以某种形式返回给DAGScheduler。对于finalStage返回的是运算结果本身。但是其他stage,返回给DAGScheduler的是一些mapstate对象。mapstate维护着map的状态信息,包括map输出block的地址。下一调度阶段可以根据这些信息获取上一级调度阶段的结果。
网友评论