美文网首页Spark源码解读
Spark中Stage的提交源码解读

Spark中Stage的提交源码解读

作者: lehi | 来源:发表于2016-04-02 15:10 被阅读216次

    版权声明:本文为原创文章,未经允许不得转载。
    复习内容:
    Spark中Job如何划分为Stage http://www.jianshu.com/p/67b502841ffc

    1.Spark中Stage的提交

    1.在复习内容中,将Job划分为Stage这一过程的调用起始于方法handleJobSubmitted,同样Stage的提交也包含在该方法中,如下所示:
    <code>
    private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[],
    func: (TaskContext, Iterator[
    ]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
    //(1)根据jobId生成finalStage,详见文章-Spark中Job如何划分为Stage
    //(2)Job的提交,详见文章-Spark中Job的提交
    //(3)提交stages,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中
    //(4)提交Taskset(tasks)
    //提交stage,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中,详见2
    submitStage(finalStage)
    //check for 正在等待或失败的stages ,他们会重新提交
    submitWaitingStages()
    }
    </code>

    2.submitStage方法如下所示,根据finalStage循环调用submitStage方法进行Stages的提交,
    <code>
    //根据Stage找到jobId
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    val missing = getMissingParentStages(stage).sortBy(_.id)
    logDebug("missing: " + missing)
    //如果没有丢失,那么就提交Stage
    if (missing.isEmpty) {
    logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
    submitMissingTasks(stage, jobId.get)详见3
    } else {
    for (parent <- missing) {
    submitStage(parent)
    }
    waitingStages += stage
    }
    }
    } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
    }
    </code>

    3.submitMissingTasks方法如下所示,该方法中包括Stage、TaskSet的提交,TaskSet(tasks)的提交请看文章-Spark中TaskSet(Tasks)的提交
    <code>
    private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    stage.pendingPartitions.clear()
    //首先找到根据ShuffleMapStage和ResultStage两种类型来找到它们对应的分区的索引ids
    val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
    stage match {
    case stage: ShuffleMapStage =>
    val allPartitions = 0 until stage.numPartitions
    val filteredPartitions = allPartitions.filter { id => stage.outputLocs(id).isEmpty }
    (allPartitions, filteredPartitions)
    case stage: ResultStage =>
    val job = stage.resultOfJob.get
    val allPartitions = 0 until job.numPartitions
    val filteredPartitions = allPartitions.filter { id => !job.finished(id) }
    (allPartitions, filteredPartitions)
    }
    }
    //创建一个内部计算器,如果stage没有accumulator被初始化,那么重置内部的accumulator
    if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
    stage.resetInternalAccumulators()
    }
    //得到Job的属性
    val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
    runningStages += stage
    //SparkListenerStageSubmitted应用在测试task是否被serializable后 然后发送出去
    //如果task没有序列化,SparkListenerStageCompleted事件将不会发送出去,它总是在SparkListenerStageSubmitted事件之后
    outputCommitCoordinator.stageStart(stage.id)
    //得到RDD得到它的分区的位置
    val taskIdToLocations = try {
    stage match {
    case s: ShuffleMapStage =>
    partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
    val job = s.resultOfJob.get
    partitionsToCompute.map { id =>
    val p = s.partitions(id)
    (id, getPreferredLocs(stage.rdd, p))
    }.toMap
    }
    } catch {
    case NonFatal(e) =>
    stage.makeNewStageAttempt(partitionsToCompute.size)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
    runningStages -= stage
    return
    }
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
    //给JobProgressListener发送SparkListenerStageSubmitted事件
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    </code>
    4.SparkListenerStageSubmitted前面我们提到,Job的启动是通过JobProgressListener的onJobStart方法执行的,同样,Stage的提交是通过,对应的事件类型是SparkListenerStageSubmitted,详见下:
    <code>
    override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
    val stage = stageSubmitted.stageInfo
    activeStages(stage.stageId) = stage
    pendingStages.remove(stage.stageId)
    val poolName = Option(stageSubmitted.properties).map {
    p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
    }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
    stageIdToInfo(stage.stageId) = stage
    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
    stageData.schedulingPool = poolName
    stageData.description = Option(stageSubmitted.properties).flatMap {
    p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
    }
    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo])
    stages(stage.stageId) = stage
    for (
    activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
    jobId <- activeJobsDependentOnStage;
    jobData <- jobIdToData.get(jobId)
    ) {
    jobData.numActiveStages += 1
    // If a stage retries again, it should be removed from completedStageIndices set
    jobData.completedStageIndices.remove(stage.stageId)
    }
    }
    </code>
    这样,我们就完成了Stage的提交,下一篇看Task的提交。

    相关文章

      网友评论

        本文标题:Spark中Stage的提交源码解读

        本文链接:https://www.haomeiwen.com/subject/evrjlttx.html