通用流程
不论 Spark 以何种模式进行部署, 任务提交后, 都会先启动 Driver 进程,随后 Driver 进程向集群管理器注册应用程序Application,之后集群管理器根据此任务的配置文件分配 Executor 并启动。
当 Driver 所需的资源全部满足后,Driver 开始执行 main 函数,Spark 查询为懒执行, 当执行到 action 算子时开始反向推算,根据宽依赖进行 stage 的划分,随后每一个 stage 对应一个 taskset,taskset 中有多个 task,根据本地化原则,task 会被分发到指定的 Executor 去执行,在任务执行的过程中, Executor 也会不断与 Driver 进行通信,报告任务运行情况。
job->stage
--driver执行main函数,遇到action算子,SparkContext.runJob函数. 在SparkContext内部, 经过一系列函数的调用, 最终通过调用DAGScheduler.runJob函数把Job提交给DAGScheduler.
--DAGScheduler运行submitJob。先将job提交, 然后创建JobWaiter以阻塞的方式等待job执行结果.DAGSchedulerEventProcessLoop post了一个JobSubmitted事件
--DAGScheduler的handleJobSubmitted函数. 正是这个函数触发了RDD到DAG的转化。开始转换为stage的逻辑
--这里执行finnalstage=newResultStage,之后生成Activejob。
--submitStage(finalStage),把当前stage的parents提交, 完事儿后在提交自己
--submitMissingTasks,该stage中所有未执行过的partition, 然后把序列化后的task(taskBinary), partition(part)等信息封装成task.新生成的tasks封装成TaskSet提交给TaskScheduler.
--taskScheduler.submitTasks,调用
--TaskSchedulerImpl创建一个TaskSetManager管理TaskSet,并且加入到调度池子中。backend.reviveOffers()(这是提醒schedulerBackend可以执行任务了)
--schedulerBackend调用makeoffers方法,会把现有的executor资源以WorkerOfffer列表的方式传给Taskscheduler,触发对现有任务的一次分配。
--调用TaskScheduler的resourceOffers,进行资源匹配,满足条件(availableCpus(i) >= CPUS_PER_TASK),调用TaskSetManager创建TaskDescription对象,向ExecutorEndpoint发送LaunchTask指令。
--总体执行过程
执行顺序
--不管哪一种模式,都要首先开启Driver
--只有资源满足的了条件,才会触发main函数的计算
--只有遇到了action,触发job,才开始才会划分stage。
--在standalone模式下,client提交了程序目的是,开启Driver。成功之后,自动结束自己的clientendpoint进程。
--DAGScheduler 将Stage 打 包到 TaskSet 交给 TaskScheduler。TaskScheduler 会将 TaskSet 封装为TaskSetManager加入到调度队列中
--询问资源,和分发任务。是SchedulerBackend,使用ReviveOffer 给driver发信息。
1.本地化分发task
2.询问excutor的执行状况
--driver调用 makeOffers 方法,,然后将 Executor 封装成 WorkerOffer对象 ; taskScheduler 基 于 这 些 资 源 调用resourceOffer 在 Executor上分配 task。
broadcast变量
--driver上面维护着一个所有的Broadcast变量对应的数据所在的Executor列表。
--默认会被切分成若干4M大小的Block,Task运行过程中读取到该Broadcast变量,会以4M为单位的Block为拉取数据的最小单位,最后将所有的Block合并成Broadcast变量对应的完整数据或数据集。
--在Executor中运行一个Task时,如果本地没有对应的broadcast数据,则会向Driver请求获取Broadcast变量对应的数据,拉取到本地,并且更新driver端口的数据。
组件工作任务:
--Client 只负责提交 Application 并监控 Application 的状态。
--对于 Spark 的任务调度主要是集中在两个方面: 资源申请和任务分发,其主要是通过 ApplicationMaster、Driver 以及 Executor 之间来完成。
--SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler中拿到合适的 Task分发到 Executor执行。
task任务级别的调度:
--FIFO
--FAIR,runningtask比minshare小,比例,runningtask与weight比值
TaskSetManager 按照一定的规则一个个取出 task 给 TaskScheduler
--本地化原则
map和reduce的个数
--map初始 RDD 分区个数由该文件的 split 个数决定
--reduce默认是spark.default.parallelism,没有设置map之后的个数一致
本地性:
--首先stage在创建时候,就初始化了有几个分区,下游有几个分区
--stage是一个一个提交的,所以下游的stage可以获得上游的stage的rdd的数据本地性
--在DAG的submitMissingTask时候就已经确定
private[spark] def getPreferredLocs(rdd:RDD[_],partition:Int):Seq[TaskLocation] ={
//如果已访问过RDD,即以获得RDD的TaskLocation则不需再次获得
//如果RDD缓存在内存中,我们访问RDD实例化时的信息便可知道RDD在那个节点上
//利用RDD在创建时重写的preferredLocations获得数据Location,从而确定Task的本地性
//如果RDD是窄依赖,则递归查找窄依赖链条上的第一个RDD的第一个Partition的数据
--在TaskScheduler进行任务匹配的时候,会判断够不够,和符不符合。符合就是难道本地性,看看在哪个地方运行更好。如果一直占用会,降低数据的本地性。
网友评论