美文网首页
DAGScheduler源码剖析

DAGScheduler源码剖析

作者: 白面葫芦娃92 | 来源:发表于2019-07-21 23:20 被阅读0次

    本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行。

    所以本文从runJob方法进行解析 从而引出DAGScheduler、TaskScheduler

    首先看RDD.scala的count()方法

      /**
       * Return the number of elements in the RDD.
       */
      def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    

    点进去

      /**
       * Run a job on all partitions in an RDD and return the results in an array.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @return in-memory collection with a result of the job (each collection element will contain
       * a result from one partition)
       */
      def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
        runJob(rdd, func, 0 until rdd.partitions.length)
      }
    

    点进去

      /**
       * Run a function on a given set of partitions in an RDD and return the results as an array.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       * partitions of the target RDD, e.g. for operations like `first()`
       * @return in-memory collection with a result of the job (each collection element will contain
       * a result from one partition)
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: Iterator[T] => U,
          partitions: Seq[Int]): Array[U] = {
        val cleanedFunc = clean(func)
        runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
      }
    

    点进去

      /**
       * Run a function on a given set of partitions in an RDD and return the results as an array.
       * The function that is run against each partition additionally takes `TaskContext` argument.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       * partitions of the target RDD, e.g. for operations like `first()`
       * @return in-memory collection with a result of the job (each collection element will contain
       * a result from one partition)
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int]): Array[U] = {
        val results = new Array[U](partitions.size)
        runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
        results
      }
    

    点进去

      /**
       * Run a function on a given set of partitions in an RDD and pass the results to the given
       * handler function. This is the main entry point for all actions in Spark.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       * partitions of the target RDD, e.g. for operations like `first()`
       * @param resultHandler callback to pass each result to
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }
    

    点进去

     /**
       * Run an action job on the given RDD and pass all the results to the resultHandler function as
       * they arrive.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       *   partitions of the target RDD, e.g. for operations like first()
       * @param callSite where in the user program this job was called
       * @param resultHandler callback to pass each result to
       * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
       *
       * @note Throws `Exception` when the job fails
       */
      def runJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): Unit = {
        val start = System.nanoTime
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
            // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
            val callerStackTrace = Thread.currentThread().getStackTrace.tail
            exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
            throw exception
        }
      }
    

    点进去

      /**
       * Submit an action job to the scheduler.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       *   partitions of the target RDD, e.g. for operations like first()
       * @param callSite where in the user program this job was called
       * @param resultHandler callback to pass each result to
       * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
       *
       * @return a JobWaiter object that can be used to block until the job finishes executing
       *         or can be used to cancel the job.
       *
       * @throws IllegalArgumentException when partitions ids are illegal
       */
      def submitJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): JobWaiter[U] = {
        // Check to make sure we are not launching a task on a partition that does not exist.
        val maxPartitions = rdd.partitions.length
        partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
          throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
              "Total number of partitions: " + maxPartitions)
        }
    
        val jobId = nextJobId.getAndIncrement()
        if (partitions.size == 0) {
          // Return immediately if the job is running 0 tasks
          return new JobWaiter[U](this, jobId, 0, resultHandler)
        }
    
        assert(partitions.size > 0)
        val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
        eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          SerializationUtils.clone(properties)))
        waiter
      }
    
    经历的步骤: 跳转了好几个runJob 最后执行这个runJob方法
    核心 向scheduler 提交一个job ,并返回一个jobwaiter
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
        eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          SerializationUtils.clone(properties)))
    
      private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
    

    eventProcessLoop接收各种消息并进行处理,处理的逻辑在其doOnReceive方法中:向eventProcessLoop 申请调用 handleJobSubmitted 方法

      private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    .........................................
    

    接下来查看 handleJobSubmitted 这个方法。主要分三块:

    1.创建ResultStage.Stage划分过程是从最后一个Stage开始,反向驱动,正向提交
      private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        var finalStage: ResultStage = null
        try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
          case e: Exception =>
            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
            listener.jobFailed(e)
            return
        }
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        clearCacheLocs()
        logInfo("Got job %s (%s) with %d output partitions".format(
          job.jobId, callSite.shortForm, partitions.length))
        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
        logInfo("Parents of final stage: " + finalStage.parents)
        logInfo("Missing parents: " + getMissingParentStages(finalStage))
    
        val jobSubmissionTime = clock.getTimeMillis()
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    
    接下来看 createResultStage,根据给定的jobid 创建ResultStage
      /**
       * Create a ResultStage associated with the provided jobId.
       */
      private def createResultStage(
          rdd: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          jobId: Int,
          callSite: CallSite): ResultStage = {
        val parents = getOrCreateParentStages(rdd, jobId)
        val id = nextStageId.getAndIncrement()
        val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stage
      }
    
    这个方法核心的 就是创建getOrCreateParentStages 方法
      /**
       * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
       * the provided firstJobId.
       */
    //根据给定的RDD获取或者创建父stages列表 ,新的stage会根据提供的firstJobId进行创建
      private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        // getShuffleDependencies 获取RDD的第一层直接宽依赖
        getShuffleDependencies(rdd).map { shuffleDep =>
    //getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
    
    接下来是getShuffleDependencies,这是一个递归方法,获取shuffle依赖的集合
      /**
       * Returns shuffle dependencies that are immediate parents of the given RDD.
       *
       * This function will not return more distant ancestors.  For example, if C has a shuffle
       * dependency on B which has a shuffle dependency on A:
       *
       * A <-- B <-- C
       *
       * calling this function with rdd C will only return the B <-- C dependency.
       *
       * This function is scheduler-visible for the purpose of unit testing.
       */
      private[scheduler] def getShuffleDependencies(
          rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
        val parents = new HashSet[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
        val waitingForVisit = new ArrayStack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          val toVisit = waitingForVisit.pop()
          if (!visited(toVisit)) {
            visited += toVisit
            toVisit.dependencies.foreach {
              case shuffleDep: ShuffleDependency[_, _, _] =>
                parents += shuffleDep
              case dependency =>
                waitingForVisit.push(dependency.rdd)
            }
          }
        }
        parents
      }
    

    接下来看getOrCreateShuffleMapStage 方法

      /**
       * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
       * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
       * addition to any missing ancestor shuffle map stages.
       */
      private def getOrCreateShuffleMapStage(
          shuffleDep: ShuffleDependency[_, _, _],
          firstJobId: Int): ShuffleMapStage = {
        shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
          case Some(stage) =>
            stage
    
          case None =>
            // Create stages for all missing ancestor shuffle dependencies.
            getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
              // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
              // that were not already in shuffleIdToMapStage, it's possible that by the time we
              // get to a particular dependency in the foreach loop, it's been added to
              // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
              // SPARK-13902 for more information.
              if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
                createShuffleMapStage(dep, firstJobId)
              }
            }
            // Finally, create a stage for the given shuffle dependency.
            createShuffleMapStage(shuffleDep, firstJobId)
        }
    
    接下来createShuffleMapStage
      /**
       * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
       * previously run stage generated the same shuffle data, this function will copy the output
       * locations that are still available from the previous shuffle to avoid unnecessarily
       * regenerating data.
       */
      def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
        val rdd = shuffleDep.rdd
        val numTasks = rdd.partitions.length
        val parents = getOrCreateParentStages(rdd, jobId)
        val id = nextStageId.getAndIncrement()
        val stage = new ShuffleMapStage(
          id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    
        stageIdToStage(id) = stage
        shuffleIdToMapStage(shuffleDep.shuffleId) = stage
        updateJobIdStageIdMaps(jobId, stage)
    
        if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
          // Kind of ugly: need to register RDDs with the cache and map output tracker here
          // since we can't do it in the RDD constructor because # of partitions is unknown
          logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
          mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
        }
        stage
      }
    
    回到createResultStage 方法 回到 handleJobSubmitted 方法
    上面是划分stage的过程
    下面是提交stage的过程 这里,就是依次将尚未提交的任务,加入任务队列waitingStages submitMissingTasks 这个方法比较长,直接看注释吧
    stage.findMissingPartitions获取需要计算的分区,不同的stage有不同的实现:
    ShuffleMapStage
    /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
      override def findMissingPartitions(): Seq[Int] = {
        mapOutputTrackerMaster
          .findMissingPartitions(shuffleDep.shuffleId)
          .getOrElse(0 until numPartitions)
      }
    

    ResultStage

      /**
       * Returns the sequence of partition ids that are missing (i.e. needs to be computed).
       *
       * This can only be called when there is an active job.
       */
      override def findMissingPartitions(): Seq[Int] = {
        val job = activeJob.get
        (0 until job.numPartitions).filter(id => !job.finished(id))
      }
    

    计算 分区的最佳位置 : taskIdToLocations

    这个是返回task的最佳位置, 其实就是哪个节点 计算最佳位置的核心方法: getPreferredLocsInternal (递归方法) 广播信息:
    为每一个MapStage的分区 创建一个 ShuffleMapTask 或者 ResultTask 将ShuffleMapTask 或者 ResultTask 封装成taskSet,提交Task 最终交由taskScheduler进行提交

    ?????

    提交任务(submitWaitingChildStages) 其实就是将waitingStages队列中的任务,递归提交 递归提交,最终执行的方法是:org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks

    相关文章

      网友评论

          本文标题:DAGScheduler源码剖析

          本文链接:https://www.haomeiwen.com/subject/xxlvlctx.html