本文基于2.3.0
众所周知,RDD的依赖关系形成后,我们就可以根据宽依赖划分Stage了。
目前Spark 的 stage分为两种: org.apache.spark.scheduler.ResultStage
和 org.apache.spark.scheduler.ShuffleMapStage
, 他们都是由org.apache.spark.scheduler.DAGScheduler
对RDD进行划分得来。
关于DAGScheduler
的几个问题:
DAGScheduler什么时候生成
SparkContext
初始化时
_dagScheduler = new DAGScheduler(this)
怎么触发DAGScheduler进行工作
总的来说,DAGScheduler每一个工作的开始,依靠事件驱动,其中当然也包括Stage划分。
下面来详细分析一番:
与DAGScheduler工作相关的驱动事件都定义在DAGSchedulerEvent.scala
里,其中和启动Stage划分有关的就是org.apache.spark.scheduler.JobSubmitted

下面简单梳理一下整个事件触发的流程:
首先
org.apache.spark.SparkContext#submitJob
提交job
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
assertNotStopped()
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob( / 重点
rdd,
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions,
callSite,
resultHandler,
localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
}
可以看到,最终还是调用dagScheduler.submitJob()
进入org.apache.spark.scheduler.DAGScheduler#submitJob
方法
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
......
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted( / 把事件post出去
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
很明显,eventProcessLoop
将JobSubmitted
事件做了一个post操作。这个post操作很简单,就是将事件放入了org.apache.spark.util.EventLoop#eventQueue
这个阻塞队列而已。
代码中的eventProcessLoop
是什么呢? 其实就是EventLoop
的子类org.apache.spark.scheduler.DAGSchedulerEventProcessLoop
。很显然,上面提到的队列也就是DAGSchedulerEventProcessLoop
对象的。
org.apache.spark.util.EventLoop
这个类里有个重要的线程
需要提一下:
private val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
......
} catch {
......
}
}
}
这个线程一旦开启,会不停在eventQueue
取事件,然后调用onReceive(event)
。
到这,其实大概就已经能猜到事件是如何触发DAGScheduler
工作了。
DAGScheduler
内部有成员变量DAGSchedulerEventProcessLoop
, 并且会在自己初始化时,调用其start
方法
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
eventProcessLoop.start()
这个start()
方法做了什么呢?
def start(): Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
eventThread.start()
}
没错,启动了eventThread
。到此,可以看到DAGScheduler
持有eventProcessLoop
, 自己post,自己消费。
还记得eventThread#run
方法里调用了onReceive(event)
吗?我们看下DAGSchedulerEventProcessLoop
是如何实现父类的onReceive(event)
的。
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event) /重点
} finally {
timerContext.stop()
}
}
很简单,继续跟进doOnReceive(event)
, org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
......
}
很简单,继续跟进dagScheduler.handleJobSubmitted
, org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
/ 分Stage入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
}
......
}
看到createResultStage
,这就是开始划分Stage了。
到这里,总结而言,提交job后,dagScheduler会根据JobSubmitted事件,触发stage划分工作。
具体DAGScheduler是如何划分的
上面部分已经提到createResultStage
方法,跟进看一下:
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
/1. 建立ResultStage 的 父Stage
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
/2. ResultStage直接new就可以了
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
ResultStage
没有什么太特别,直接new出来了,主要还是它的父Stage们。我们再关注getOrCreateParentStages(rdd, jobId)
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
上面代码意思很明显,要获得rdd的ShuffleDependencies, 也就是宽依赖们。然后逐个生成ShuffleMapStage
. 那我们继续跟踪getOrCreateShuffleMapStage(shuffleDep, firstJobId)
的调用,最后会来到DAGScheduler#createShuffleMapStage
:
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd / 这里拿出了父rdd
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId) / 在此继续往前追溯ShuffleMapStage
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage( /这个ShuffleMapStage的生成就到此为止了
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
到这里,ShuffleMapStage
生成了。
总结,整体过程还是比较清晰,从最末尾的rdd开始,往前追溯,按宽依赖划分生成ShuffleMapStage
, 最后一个stage直接为ResultStage
。整个过程中也同时在构建stage间的父子依赖关系。
收工!!!
网友评论