版权声明:本文为原创文章,未经允许不得转载。
复习内容:
Spark中Job如何划分为Stage http://www.jianshu.com/p/67b502841ffc
1.Spark中Stage的提交
1.在复习内容中,将Job划分为Stage这一过程的调用起始于方法handleJobSubmitted,同样Stage的提交也包含在该方法中,如下所示:
<code>
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],
func: (TaskContext, Iterator[]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
//(1)根据jobId生成finalStage,详见文章-Spark中Job如何划分为Stage
//(2)Job的提交,详见文章-Spark中Job的提交
//(3)提交stages,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中
//(4)提交Taskset(tasks)
//提交stage,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中,详见2
submitStage(finalStage)
//check for 正在等待或失败的stages ,他们会重新提交
submitWaitingStages()
}
</code>
2.submitStage方法如下所示,根据finalStage循环调用submitStage方法进行Stages的提交,
<code>
//根据Stage找到jobId
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
//如果没有丢失,那么就提交Stage
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)详见3
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
</code>
3.submitMissingTasks方法如下所示,该方法中包括Stage、TaskSet的提交,TaskSet(tasks)的提交请看文章-Spark中TaskSet(Tasks)的提交
<code>
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()
//首先找到根据ShuffleMapStage和ResultStage两种类型来找到它们对应的分区的索引ids
val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
stage match {
case stage: ShuffleMapStage =>
val allPartitions = 0 until stage.numPartitions
val filteredPartitions = allPartitions.filter { id => stage.outputLocs(id).isEmpty }
(allPartitions, filteredPartitions)
case stage: ResultStage =>
val job = stage.resultOfJob.get
val allPartitions = 0 until job.numPartitions
val filteredPartitions = allPartitions.filter { id => !job.finished(id) }
(allPartitions, filteredPartitions)
}
}
//创建一个内部计算器,如果stage没有accumulator被初始化,那么重置内部的accumulator
if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
stage.resetInternalAccumulators()
}
//得到Job的属性
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
runningStages += stage
//SparkListenerStageSubmitted应用在测试task是否被serializable后 然后发送出去
//如果task没有序列化,SparkListenerStageCompleted事件将不会发送出去,它总是在SparkListenerStageSubmitted事件之后
outputCommitCoordinator.stageStart(stage.id)
//得到RDD得到它的分区的位置
val taskIdToLocations = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.resultOfJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
//给JobProgressListener发送SparkListenerStageSubmitted事件
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
</code>
4.SparkListenerStageSubmitted前面我们提到,Job的启动是通过JobProgressListener的onJobStart方法执行的,同样,Stage的提交是通过,对应的事件类型是SparkListenerStageSubmitted,详见下:
<code>
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stage = stageSubmitted.stageInfo
activeStages(stage.stageId) = stage
pendingStages.remove(stage.stageId)
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
}.getOrElse(SparkUI.DEFAULT_POOL_NAME)
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
stageData.schedulingPool = poolName
stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo])
stages(stage.stageId) = stage
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
jobId <- activeJobsDependentOnStage;
jobData <- jobIdToData.get(jobId)
) {
jobData.numActiveStages += 1
// If a stage retries again, it should be removed from completedStageIndices set
jobData.completedStageIndices.remove(stage.stageId)
}
}
</code>
这样,我们就完成了Stage的提交,下一篇看Task的提交。
网友评论