DAGScheduler.handleJobSubmitted
//创建job
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
//提价stage
submitStage(finalStage)
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 看当前stage有没有parentstage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
//没有parent stage 就提交任务
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
//有parent stage,递归进行寻找没有parent stage的进行提交任务
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
submitMissingTasks
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
//计算分区,每一个分区编号对应一个task
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
//创建task
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
// Figure out the indexes of partition ids to compute.
//分区编号
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
ShuffleMapStage.findMissingPartitions
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
//看看shuffleDep.shuffleId有没有当前分区
1.有的话就取 2.没有就是stage最后一个rdd的分区数
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
网友评论