1. 概要
从上文中我们可以看到JOB内部是分成多个Stage, 这些Stage之间以Action操作来划分.
stage
继而我们可以从WebUI上看到每个stage内部是划分成多个Task
实际上每个Task就是对Parition的各种操作, 这些Task是可以并行运行的, 这也是分布式计算的最重要的特点, 让那些能并发计算的任务在大量的机器上一起运行, 从而加速计算.
每个stage仅仅针对一个RDD进行操作, 但它可能依赖于多个上游的parent stage. 像rdd.union操作这种就是把两个RDD合并成一个RDD
stage的依赖前文中也提到了, 有两种stage, 中间计算结果的ShuffleMapStage, 和没有任何后续依赖的ResultStage
两种stage
在JOB内部, 把RDD的shuffle操作当成篱笆, 篱笆与篱笆之间的transform操作合并到一个stage里面去. 最终实现了spark的高效计算的框架
stage生成
每个Stage对上游的stage的依赖一定是源于某个shuffle操作, 不得不从其它节点上拉取数据, Executor自身维护的parition不足以进行计算.如果发现它们依赖的那些数据没有计算完, 就会向上游发信息要求重新进行计算.
INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 36 (ShuffledRDD[86] at reduceByKey at <console>:24)
这些缺失的信息需要计算的Task会发送给TaskScheduler, 由它进行具体的安排.
缺失信息计算
2. ShuffleMapStage
前文提到, 本质上这里就是经过一系列的Map 运算, 生成中间结果的shuffle文件, 等待后续的Reduce 过程拉取这些文件, 进行下一轮计算.
一个ShuffleMapStage内部可能包含多个本地 pipeline 计算, 比如像map
, filter
这些narrowdependency
过程是尽可能的在本地完成的, 并且是以Task的形式在所有的parition上并发跑完的.
如果做了缓存, 或者正好这些shuffle文件没有过期, 一个中间结果, 可以被多个JOB的stage使用. 如果恰好中间有一些文件过期被删除了, 就要从新计算.
/**
* ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
* They occur right before each shuffle operation, and might contain multiple pipelined operations
* before that (e.g. map and filter). When executed, they save map output files that can later be
* fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
* and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
*
* ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
* For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
* there can be multiple ActiveJobs trying to compute the same shuffle map stage.
*/
2.1 初始化需要
-
id
- - identifier -
rdd
— 涉及到的RDD -
numTasks
— 并发数, 也就是task数, 和parition应该是1:1的 -
parents
— 上游依赖的多个stage -
firstJobId
— 归属到哪个JOB里, 第一个触发这个stage创建的job的id. 需要注意由于这个RDD在多个JOB里被用到, 这里仅仅用第一个触发的来记录它. -
shuffleDep
— 依赖的上游的shuffle的一些信息的句柄. 沿着句柄找到它依赖的上游的那些shuffle过程的数据
2.2 Registering MapStatus For Partition
addOutputLoc(partition: Int, status: MapStatus): Unit
对Parition执行的MAP过程, 也就是task过程进行状态跟踪
这个时候将numAvailableOutputs
喜加一
2.3 Removing MapStatus For Partition And BlockManager
removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit
把MapStatus从outPutLoc列表里删除, 同时也意味着这个存储块试下了, 所以也要通知管理存储的BlockManager.
这个时候将numAvailableOutputs
减一
2.4 ShuffleMapStage的共享
上文中我们也到一个中间状态的RDD可能出现在多个JOB里, 类似我们生成了一个PairRDD<Person, Salary>. 我们可能需要知道每个人的工资是多少reduceByKey
, 也可能想知道一共需要发多少工资reduce
, 还可能想知道有多少个人count
这些Action会触发各自的Job, 但他们都依赖同一个中间结果
2.5 Registering Job -- Deregistering Job
addActiveJob(job: ActiveJob): Unit
removeActiveJob(job: ActiveJob): Unit
像上文说的, 我们需要维护这个stage和那些Activejob有关系
2.6 Preparing Shuffle Map Outputs in MapOutputTracker Format
Preparing Shuffle Map Outputs in MapOutputTracker Format
TODO 内容优势疏漏, 待完善
当这个stage内部计算完毕的时候, 所有的MapStatus标记为Finished. 这个时候可以通知DAGScheduler进行下一步依赖这个中间Stage的操作了.
对应的Shuffle结果会发送给MapOutputTracker来管理, shuffle结果以每个parition的第一个元素的地址来标记. 理解为这里分布式的产生一大堆的结果在每个服务器上, 现在我告诉你每个结果的入口(链表头), 你就可以来找这些数据了.
/**
* Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned
* value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,
* that position is filled with null.
*/
def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {
outputLocs.map(_.headOption.orNull)
}
3. ResultStage
最终大结局,DAG的末尾.
由于DAG图是从后向前推算的, 所以ResultStage事实上是这个JOB内部创建出来的第一个Stage
计算过程和ShuffleMapStage一样
计算过程
`
网友评论