美文网首页
DAGScheduler源码分析

DAGScheduler源码分析

作者: Sunnywade | 来源:发表于2018-01-21 11:29 被阅读0次

    简介

    RDD源码走读分析的例子中看到,通过RDD执行map和reduce方法即可计算数组元素的平方和,RDD的reduce方法最终调用了dagScheduler.runJob方法执行任务,DAGScheduler是如何划分划分并行计算任务的是本章关注的重点。在了解DAGScheduler前先了解下几个概念:

    几个概念

    • Job: 向DAG Scheduler提交的一次作业,在RDD中,每执行一次Action(如collect, count, reduce等)的操作就会提交一个Job,Job是通过DAGScheduler的submitJob方法提交的。

    • Stage: 一个Job包含多个Stage,Stage之间的边界是一个Shuffle的过程,在DAG中上游的Stage将计算结果输出后,下游的Stage获取结果后才能进行计算。在Spark中有两类Stage,一类是ShuffleMapStage,用于为shuffle输出结果,另一类是ResultStage,是DAG中最后的一个Stage,用于输出计算结果。如果多个Job共享一个RDD,则Stage也可以在多个Job之间共享。

    • Task: 一个独立的计算单元,一个Stage可以包含多个Task,每一个Stage内的Task负责在同一个RDD的不同partition上运行相同的函数。

    从runJob开始吧

    在RDD执行一个action的操作后最终都会调用DAGScheduler的runJob方法开始执行作业,runJob的代码如下:

      def runJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): Unit = {
        val start = System.nanoTime
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
            // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
            val callerStackTrace = Thread.currentThread().getStackTrace.tail
            exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
            throw exception
        }
      }
    

    这个方法传入了要计算的RDD,计算函数 func,需要在RDD的哪些partition上计算,结果处理函数resultHandler,配置项信息等,调用SubmitJob函数,该函数返回了一个waiter对象用于异步等待计算结果返回。因此重点在于submitJob之中。
    submitJob的代码如下:

      def submitJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): JobWaiter[U] = {
        // Check to make sure we are not launching a task on a partition that does not exist.
        val maxPartitions = rdd.partitions.length
        partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
          throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
              "Total number of partitions: " + maxPartitions)
        }
    
        val jobId = nextJobId.getAndIncrement()
        if (partitions.size == 0) {
          // Return immediately if the job is running 0 tasks
          return new JobWaiter[U](this, jobId, 0, resultHandler)
        }
    
        assert(partitions.size > 0)
        val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
        eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          SerializationUtils.clone(properties)))
        waiter
      }
    

    这段代码主要包括判断partition 的index是否越界,生成JobID,生成waiter对象,将JobSubmitted事件提交到事件处理对列eventProcessLoop中,因此需要重点看这个队列是怎样处理JobSubmitted事件的。

    eventProcessLoop是一个DAGSchedulerEventProcessLoop对象,DAGSchedulerEventProcessLoop负责接收事件,并在一个Daemon线程中异步的按照FIFO的顺序处理这些事件。事件处理的主体逻辑在doOnReceive中,该方法中对JobSubmitted的处理方式是调用DAGScheduler的handleJobSubmitted方法,因此视线需要转移到handleJobSubmitted上来:

      private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        var finalStage: ResultStage = null
        try {
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
          case e: Exception =>
            listener.jobFailed(e)
            return
        }
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    
        val jobSubmissionTime = clock.getTimeMillis()
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    

    前面提到,一个Job包中包含一个ResultStage和多个ShuffleMapStage,在handleJobSubmitted中首先创建了ResultStage(这里叫finalStage),生成ActiveJob,并在消息Bus中发送了JobStart Event,将该Event传递给Bus中的listeners。(关于消息Bus可以参考Spark消息总线实现),最后执行submitStage方法提交Stage。该方法的核心部分是createResultStage和submitStage。

    创建ResultStage

    创建resultStage的前提是首先创建其依赖的parent Stage,然后创建resultStage。

      private def createResultStage(
          rdd: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          jobId: Int,
          callSite: CallSite): ResultStage = {
        val parents = getOrCreateParentStages(rdd, jobId)
        val id = nextStageId.getAndIncrement()
        val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stage
      }
    

    创建parent stage的源码如下,该方法遍历当前RDD的ShuffleDependencies,并为每一个ShuffleDependencies的RDD创建ShuffleMapStage

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
    
    • 遍历当前RDD的ShuffleDependencies的源码如下,该方法寻找其直接依赖的ShuffleDependency,例如
      A <-- B <-- C
      D<--E<-C
      其中<--表示ShuffleDependency,<-表示NarrowDependency,则该方法只会 返回B<--C和D<--C的Dependency。即该方法只会遍历最近一层ShuffleDependency。
      private[scheduler] def getShuffleDependencies(
          rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
        val parents = new HashSet[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
        val waitingForVisit = new ArrayStack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          val toVisit = waitingForVisit.pop()
          if (!visited(toVisit)) {
            visited += toVisit
            toVisit.dependencies.foreach {
              case shuffleDep: ShuffleDependency[_, _, _] =>
                parents += shuffleDep
              case dependency =>
                waitingForVisit.push(dependency.rdd)
            }
          }
        }
        parents
      }
    
    • 为每一个ShuffleDependencies的RDD创建ShuffleMapStage的方法如下:该方法首先在shuffleIdToMapStage中找是否存在已经创建的Stage,如果存在则直接返回,否则先为当前ShuffleDependency的父ShuffleDependency创建MapStage,然后自己再创建MapStage.
      private def getOrCreateShuffleMapStage(
          shuffleDep: ShuffleDependency[_, _, _],
          firstJobId: Int): ShuffleMapStage = {
        shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
          case Some(stage) =>
            stage
    
          case None =>
            
         getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
           
              if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
                createShuffleMapStage(dep, firstJobId)
              }
            }
            createShuffleMapStage(shuffleDep, firstJobId)
        }
      }
    
    • getMissingAncestorShuffleDependencies是创建父ShuffleDependencies的方法,该方法采用了广度优先的图遍历方式获取该RDD的所有祖先ShuffleDependencies:
     private def getMissingAncestorShuffleDependencies(
         rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
       val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
       val visited = new HashSet[RDD[_]]
       // We are manually maintaining a stack here to prevent StackOverflowError
       // caused by recursively visiting
       val waitingForVisit = new ArrayStack[RDD[_]]
       waitingForVisit.push(rdd)
       while (waitingForVisit.nonEmpty) {
         val toVisit = waitingForVisit.pop()
         if (!visited(toVisit)) {
           visited += toVisit
           getShuffleDependencies(toVisit).foreach { shuffleDep =>
             if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
               ancestors.push(shuffleDep)
               waitingForVisit.push(shuffleDep.rdd)
             } // Otherwise, the dependency and its ancestors have already been registered.
           }
         }
       }
       ancestors
     }
    

    前面讲了太多的函数之间的调用关系,总之就是在生成ResultStage前必须先生成其依赖的ShuffleMapStage,下面这张图是一个总结:


    生成Stage的调用关系图

    提交Stage

    在根据RDD之间的依赖关系生成Stage之后,接下来就需要将Stage提交到TaskScheduler并执行。
    这段代码首先获取其依赖的parentStage,若没有依赖的missing parentStage这里missing是指没有被缓存,且Stage的isAvailable为false)则直接提交当前的Stage,否则将还没有执行的parentStage按照依赖先后顺序递归提交,并将当前的Stage加入到等待队列waitingStages中。

      private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        if (jobId.isDefined) {
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            val missing = getMissingParentStages(stage).sortBy(_.id)
            if (missing.isEmpty) {
              submitMissingTasks(stage, jobId.get)
            } else {
              for (parent <- missing) {
                submitStage(parent)
              }
              waitingStages += stage
            }
          }
        } else {
          abortStage(stage, "No active job for stage " + stage.id, None)
        }
      }
    

    获取依赖的missing parentStage的主逻辑在getMissingParentStages里面,该方法定义了一个visit方法以广度优先的方式遍历该Stage依赖的RDD,如果访问的RDD已经缓存,则无需再计算其parent Stage,否则需要根据Dependency的关系继续遍历其parent RDD。

    private def getMissingParentStages(stage: Stage): List[Stage] = {
        val missing = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
         val waitingForVisit = new ArrayStack[RDD[_]]
        def visit(rdd: RDD[_]) {
          if (!visited(rdd)) {
            visited += rdd
            val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
            if (rddHasUncachedPartitions) {
              for (dep <- rdd.dependencies) {
                dep match {
                  case shufDep: ShuffleDependency[_, _, _] =>
                    val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                    if (!mapStage.isAvailable) {
                      missing += mapStage
                    }
                  case narrowDep: NarrowDependency[_] =>
                    waitingForVisit.push(narrowDep.rdd)
                }
              }
            }
          }
        }
        waitingForVisit.push(stage.rdd)
        while (waitingForVisit.nonEmpty) {
          visit(waitingForVisit.pop())
        }
        missing.toList
      }
    

    提交Missing Task的逻辑包括:

    • 获取当前Stage还需要计算的partition(partitionsToCompute)
    • 计算每一个Partition的最优位置
        val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
          stage match {
            case s: ShuffleMapStage =>
              partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
            case s: ResultStage =>
              partitionsToCompute.map { id =>
                val p = s.partitions(id)
                (id, getPreferredLocs(stage.rdd, p))
              }.toMap
          }
        } 
    
    • 将Stage进行二进制编码并广播至Executor
     val taskBinaryBytes: Array[Byte] = stage match {
            case stage: ShuffleMapStage =>
              JavaUtils.bufferToArray(
                closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
            case stage: ResultStage =>
              JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
          }
    taskBinary = sc.broadcast(taskBinaryBytes)
    
    • 生成Task,每一个需要计算的Partition对应生成一个Task,其中ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask。
    val tasks: Seq[Task[_]] = try {
          val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
          stage match {
            case stage: ShuffleMapStage =>
              stage.pendingPartitions.clear()
              partitionsToCompute.map { id =>
                val locs = taskIdToLocations(id)
                val part = stage.rdd.partitions(id)
                stage.pendingPartitions += id
                new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                  Option(sc.applicationId), sc.applicationAttemptId)
              }
    
            case stage: ResultStage =>
              partitionsToCompute.map { id =>
                val p: Int = stage.partitions(id)
                val part = stage.rdd.partitions(p)
                val locs = taskIdToLocations(id)
                new ResultTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, id, properties, serializedTaskMetrics,
                  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
              }
          }
        }
    

    *通过TaskScheduler提交Tasks

    taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    

    *提交等待的子Stage

       submitWaitingChildStages(stage)
    

    关于这些步骤的细节将在后续章节深入展开,包括获取Partition的最优位置,TaskScheduler提交Task等。

    总结

    本文重点讲述了DAGScheduler中的几个概念,Job,Stage,Task,以及DAGScheduler根据RDD之间的依赖关系划分Stage的过程。
    一个Job包含一个或多个Stage,一个Stage可以在一个或多个Job之间共享,Stage之间是以RDD的ShuffleDependency为划分界限的。每一个Stage可包含一个或者多个Task,每一个Task对应一个Stage的RDD中尚未计算的一个Partition。

    相关文章

      网友评论

          本文标题:DAGScheduler源码分析

          本文链接:https://www.haomeiwen.com/subject/xgwanxtx.html