美文网首页
Spark源码之DAGScheduler

Spark源码之DAGScheduler

作者: 小狼星I | 来源:发表于2018-10-17 15:07 被阅读0次

    Spark源码之DAGScheduler介绍篇

    Spark Application中的RDD经过一系列的Transformation操作后由Action算子导致了SparkContext.runjob的执行,之后执行DAGScheduler.runJob(),最终导致了DAGScheduler中的submitJob的执行,在DAGScheduler中完成sparkJob的DAG划分,并将生成的TaskSet交给taskScheduler处理,如下图所示:

    image

    运行job

    我们从RDD的Action操作产生的SparkContext.runjob说起,在SparkContext.runjob()中最终调用了dagScheduler.runJob()方法;如下源码所示:

    /**
       * Return an array that contains all of the elements in this RDD.
       */
      def collect(): Array[T] = withScope {
        val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
        Array.concat(results: _*)
      }
    
      /**
       * Run a function on a given set of partitions in an RDD and pass the results to the given
       * handler function. This is the main entry point for all actions in Spark.
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }
    

    接着看DAGScheduler.runjob()方法,在方法里面调用了submitJob()方法,并且返回一个JobWaiter监听submitJob的结果,并对结果做出相应的处理;

      def runJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): Unit = {
        val start = System.nanoTime
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        waiter.awaitResult() match {
          case JobSucceeded =>
            logInfo("Job %d finished: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          case JobFailed(exception: Exception) =>
            logInfo("Job %d failed: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
            // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
            val callerStackTrace = Thread.currentThread().getStackTrace.tail
            exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
            throw exception
        }
      }
    

    提交job

    进入submitJob方法,如下源代码所示,先生成一个jobId,紧接着使用eventProcessLoop发送一个JobSubmitted的消息,那我们就要看下这个eventProcessLoop是什么了;

      def submitJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): JobWaiter[U] = {
        // Check to make sure we are not launching a task on a partition that does not exist.
        val maxPartitions = rdd.partitions.length
        partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
          throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
              "Total number of partitions: " + maxPartitions)
        }
    
        val jobId = nextJobId.getAndIncrement()
        if (partitions.size == 0) {
          // Return immediately if the job is running 0 tasks
          return new JobWaiter[U](this, jobId, 0, resultHandler)
        }
    
        assert(partitions.size > 0)
        val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
        eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          SerializationUtils.clone(properties)))
        waiter
      }
    

    查看源码发现eventProcessLoop是一个消息循环体,而且他还继承了EventLoop,再看下EventLoop的代码,发现EventLoop是一个时间处理器,在内部使用BlockingQueue去存储接受到的消息事件,用一个守护线程去执行onReceive,而onReceive方法在DAGSchedulerEventProcessLoop中已经被重写,而在onReceive方法中调用doOnReceive方法做具体的事件处理;

      private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
    
    private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
      extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
    
      private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
    
      /**
       * The main event loop of the DAG scheduler.
       */
      override def onReceive(event: DAGSchedulerEvent): Unit = {
        val timerContext = timer.time()
        try {
          doOnReceive(event)
        } finally {
          timerContext.stop()
        }
      }
    
      private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
        case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
          dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
    private[spark] abstract class EventLoop[E](name: String) extends Logging {
    
      private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
    
      private val stopped = new AtomicBoolean(false)
    
      private val eventThread = new Thread(name) {
        setDaemon(true)
    
        override def run(): Unit = {
          try {
            while (!stopped.get) {
              val event = eventQueue.take()
              try {
                onReceive(event)
              } catch {
                case NonFatal(e) => {
                  try {
                    onError(e)
                  } catch {
                    case NonFatal(e) => logError("Unexpected error in " + name, e)
                  }
                }
              }
            }
          } catch {
            case ie: InterruptedException => // exit even if eventQueue is not empty
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
    

    ok,我们已经知道了在DAGScheduler中的消息事件是如何处理的,那么我们还是言归正传,继续看在SubmitJob的方法中使用eventProcessLoop发送一个JobSubmitted消息给自己,也就是在doOnReceive方法中找到JobSubmitted事件,在此方法中又继续调用了dagScheduler.handleJobSubmitted方法;
    如下源代码所示:

      private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    

    划分stage,生成stage依赖关系

    那我们就进入handleJobSubmitted方法,我们先看下此方法中的finalStage = newResultStage(....)代码,在这里要说一下在一个DAG中最后一个Stage叫做resultStage,而前面的所有stage都叫做shuffleMapStage;而newResultStage(....)方法就是根据提供的jobId生成一个ResultStage,如下源码所示:

      private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        var finalStage: ResultStage = null
        try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
          finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
          case e: Exception =>
            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
            listener.jobFailed(e)
            return
        }
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        clearCacheLocs()
        logInfo("Got job %s (%s) with %d output partitions".format(
          job.jobId, callSite.shortForm, partitions.length))
        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
        logInfo("Parents of final stage: " + finalStage.parents)
        logInfo("Missing parents: " + getMissingParentStages(finalStage))
    
        val jobSubmissionTime = clock.getTimeMillis()
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
    
        submitWaitingStages()
      }
    
    

    那我们就要看下ResultStage是如何生成的,我们可以看到,在newResultStage方法中先通过getParentStagesAndId方法获取
    ResultStage的所有父stage,然后在new出一个ResultStage实例来;

    /**
       * Create a ResultStage associated with the provided jobId.
       */
      private def newResultStage(
          rdd: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          jobId: Int,
          callSite: CallSite): ResultStage = {
        val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
        val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stage
      }
    
    

    紧接着我们把代码追踪到getParentStages方法中,这个方法可以根据提供的RDD创建一个父stage的列表,我们再来剖析下这个方法; 在这个方法中先实例出两个数据结构parents,visited和waitingForVisit,parents是用来存放所有父类stage的数据集,而visited使用来存储已被访问的RDD,而waitingForVisit则是等待被访问的RDD数据集;
    在下面代码中,先将传入的RDD放入到waitingForVisit数据集中,然后循环waitingForVisit中所有的RDD,每次循环调用visit方法。在visit方法中它利用RDD的dependencies从后向前建立依赖关系,在遍历RDD的dependencies时如果是shufDep就生成一个getShuffleMapStage放入到parents数据集中,如果不是就将该dependencie对应的RDD放入到waitingForVisit中,等待下一次遍历,最终该方法返回一个父stage的数据集parents给newResultStage方法;
    而且在newResultStage中new出ResultStage,并将stage的数据集parents存放于该ResultStage中;

     /**
       * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
       * the provided firstJobId.
       */
      private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        val parents = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        // We are manually maintaining a stack here to prevent StackOverflowError
        // caused by recursively visiting
        val waitingForVisit = new Stack[RDD[_]]
        def visit(r: RDD[_]) {
          if (!visited(r)) {
            visited += r
            // Kind of ugly: need to register RDDs with the cache here since
            // we can't do it in its constructor because # of partitions is unknown
            for (dep <- r.dependencies) {
              dep match {
                case shufDep: ShuffleDependency[_, _, _] =>
                  parents += getShuffleMapStage(shufDep, firstJobId)
                case _ =>
                  waitingForVisit.push(dep.rdd)
              }
            }
          }
        }
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          visit(waitingForVisit.pop())
        }
        parents.toList
      }
    

    经过一番折腾后我们再回到handleJobSubmitted方法,现在我们已经获取到了该job的ResultStage,和该ResultStage的父stages然后生成一个ActiveJob在DAGScheduler中,以及打印一些stage的信息, 这里有调用getMissingParentStages()方法,这个我们在接下来的submitStage方法中讲述,源代码如下所示:

     val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        clearCacheLocs()
        logInfo("Got job %s (%s) with %d output partitions".format(
          job.jobId, callSite.shortForm, partitions.length))
        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
        logInfo("Parents of final stage: " + finalStage.parents)
        logInfo("Missing parents: " + getMissingParentStages(finalStage))
    
        val jobSubmissionTime = clock.getTimeMillis()
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
    
        submitWaitingStages()
    

    提交stage

    接下来进入submitStage方法中,在这个方法中,会先调用getMissingParentStages()方法,这个方法用于获取stage未执行的Parent Stage,如果有则使用递归的方式将该stage提交,并将该stage加入到waitingStages中,也可以再看下getMissingParentStages()方法,该方法和getParentStages()方法一样,只不过该方法会判断Stage中的rdds是否在cache中存在,cacheLocs 维护着RDD的partitions的location信息,该信息是TaskLocation的实例。如果从cacheLocs中获取到partition的location信息直接返回,若获取不到:如果RDD的存储级别为空返回nil;

     /** Submits stage, but first recursively submits any missing parents. */
      private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        if (jobId.isDefined) {
          logDebug("submitStage(" + stage + ")")
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing.isEmpty) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              submitMissingTasks(stage, jobId.get)
            } else {
              for (parent <- missing) {
                submitStage(parent)
              }
              waitingStages += stage
            }
          }
        } else {
          abortStage(stage, "No active job for stage " + stage.id, None)
        }
      }
    
    
    private def getMissingParentStages(stage: Stage): List[Stage] = {
        val missing = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        // We are manually maintaining a stack here to prevent StackOverflowError
        // caused by recursively visiting
        val waitingForVisit = new Stack[RDD[_]]
        def visit(rdd: RDD[_]) {
          if (!visited(rdd)) {
            visited += rdd
            val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
            if (rddHasUncachedPartitions) {
              for (dep <- rdd.dependencies) {
                dep match {
                  case shufDep: ShuffleDependency[_, _, _] =>
                    val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                    if (!mapStage.isAvailable) {
                      missing += mapStage
                    }
                  case narrowDep: NarrowDependency[_] =>
                    waitingForVisit.push(narrowDep.rdd)
                }
              }
            }
          }
        }
        waitingForVisit.push(stage.rdd)
        while (waitingForVisit.nonEmpty) {
          visit(waitingForVisit.pop())
        }
        missing.toList
      }
    

    将TaskSet提交给TaskScheduler

    在处理完getMissingParentStages()方法后,便调用submitMissingTasks()方法,在这个方法里面便是提交Task了,下面我们便详细分析这个方法;

    1. 首先获取到该stage的partition,并将该stage放入到runningStages数据结构中;
    2. 接着获取task的数据本地性;
    3. 根据stage的类型生成相应类型的Task对象(ShuffleMapTask/ResultTask);
    4. 将生成的TaskSet提交给taskScheduler,至此DAGScheduler的工作结束;
      private def submitMissingTasks(stage: Stage, jobId: Int) {
        logDebug("submitMissingTasks(" + stage + ")")
        // Get our pending tasks and remember them in our pendingTasks entry
        stage.pendingPartitions.clear()
    
        // First figure out the indexes of partition ids to compute.
        //todo 获取该stage的partition
        val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
        
        // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
        // with this Stage
        val properties = jobIdToActiveJob(jobId).properties
        //todo 将该stage加入到runningStages中
        runningStages += stage
        
        //todo 获取task的数据本地性
        val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
          stage match {
            case s: ShuffleMapStage =>
              partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
            case s: ResultStage =>
              val job = s.activeJob.get
              partitionsToCompute.map { id =>
                val p = s.partitions(id)
                (id, getPreferredLocs(stage.rdd, p))
              }.toMap
          }
        } catch {
          case NonFatal(e) =>
            stage.makeNewStageAttempt(partitionsToCompute.size)
            listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
            abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
            runningStages -= stage
            return
        }
        
        //TODO 根据stage的类型 生成相应类型的Task
        val tasks: Seq[Task[_]] = try {
          stage match {
            case stage: ShuffleMapStage =>
              partitionsToCompute.map { id =>
                val locs = taskIdToLocations(id)
                val part = stage.rdd.partitions(id)
                new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, stage.internalAccumulators)
              }
    
            case stage: ResultStage =>
              val job = stage.activeJob.get
              partitionsToCompute.map { id =>
                val p: Int = stage.partitions(id)
                val part = stage.rdd.partitions(p)
                val locs = taskIdToLocations(id)
                new ResultTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, id, stage.internalAccumulators)
              }
          }
        } catch {
          case NonFatal(e) =>
            abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
            runningStages -= stage
            return
        }
        
         if (tasks.size > 0) {
          logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
          stage.pendingPartitions ++= tasks.map(_.partitionId)
          logDebug("New pending partitions: " + stage.pendingPartitions)
          //todo 将taskSet提交给taskScheduler
          taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
          stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
        } else {
          // Because we posted SparkListenerStageSubmitted earlier, we should mark
          // the stage as completed here in case there are no tasks to run
          markStageAsFinished(stage, None)
    
          val debugString = stage match {
            case stage: ShuffleMapStage =>
              s"Stage ${stage} is actually done; " +
                s"(available: ${stage.isAvailable}," +
                s"available outputs: ${stage.numAvailableOutputs}," +
                s"partitions: ${stage.numPartitions})"
            case stage : ResultStage =>
              s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
          }
          logDebug(debugString)
        }
      }
    
    

    关于task获取数据的本地性,这里单独说明一下,在调用getPreferredLocs()方法获取数据本地性,紧接着进入getPreferredLocsInternal()可以发现,其实这里使用了RDD的preferredLocations来获取的,可见spark内部是如何的"偷懒";

     private def getPreferredLocsInternal(
          rdd: RDD[_],
          partition: Int,
          visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
        // If the partition has already been visited, no need to re-visit.
        // This avoids exponential path exploration.  SPARK-695
        if (!visited.add((rdd, partition))) {
          // Nil has already been returned for previously visited partitions.
          return Nil
        }
        // If the partition is cached, return the cache locations
        val cached = getCacheLocs(rdd)(partition)
        if (cached.nonEmpty) {
          return cached
        }
        // If the RDD has some placement preferences (as is the case for input RDDs), get those
        val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
        if (rddPrefs.nonEmpty) {
          return rddPrefs.map(TaskLocation(_))
        }
    

    至此DAGScheduler叙述完毕,主要是DAGScheduler内如何处理划分stage的,又如何生成Task,最后将生成的TaskSet提交给TaskScheduler的;

    相关文章

      网友评论

          本文标题:Spark源码之DAGScheduler

          本文链接:https://www.haomeiwen.com/subject/mjqrzftx.html