美文网首页spark
任务切分-源码分析

任务切分-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-22 09:43 被阅读0次

DAGScheduler.handleJobSubmitted

//创建job
 val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
//提价stage
submitStage(finalStage)
private def submitStage(stage: Stage): Unit = {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug(s"submitStage($stage (name=${stage.name};" +
        s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
         //  看当前stage有没有parentstage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
         //没有parent stage 就提交任务
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
         //有parent stage,递归进行寻找没有parent stage的进行提交任务

          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

submitMissingTasks

val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
         //计算分区,每一个分区编号对应一个task
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
             //创建task
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
// Figure out the indexes of partition ids to compute.
    //分区编号
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

ShuffleMapStage.findMissingPartitions

 override def findMissingPartitions(): Seq[Int] = {
    mapOutputTrackerMaster
       //看看shuffleDep.shuffleId有没有当前分区
       1.有的话就取 2.没有就是stage最后一个rdd的分区数
      .findMissingPartitions(shuffleDep.shuffleId)
      .getOrElse(0 until numPartitions)
  }

相关文章

网友评论

    本文标题:任务切分-源码分析

    本文链接:https://www.haomeiwen.com/subject/spvmgltx.html