美文网首页
spark源码阅读之scheduler模块①

spark源码阅读之scheduler模块①

作者: invincine | 来源:发表于2019-02-25 18:23 被阅读0次

    本文基于Spark 1.6.3版本源码

    整体概述

    spark的调度模块可以说是非常有特色的模块设计,使用DAG(有向无环图)刻画spark任务的逻辑关系,将任务切分为多个stage,在每个stage中根据并行度又分为多个task,这多个Task的计算逻辑都一样,然后把封装好的task提交给executor执行得出结果。且每个stage之间以及stage内部又存在着依赖关系,通过这些依赖关系构成了lineage,可以提供很好的容错性。

    spark调度模块中起主导作用的类有三个:DAGScheduler,TaskScheduler,SchedulerBackend

    DAGScheduler:被称为high-level scheduling layer(高阶调度层),主要负责根据ShuffleDependency将Job分为多个stage,每个stage中有一组并行的执行相同计算逻辑的Task,将这组Task的元数据封装成为TaskSets,然后提交给TaskScheduler来执行调度计算。

    TaskScheduler:被称作low-level Task scheduler interface(低阶的Task调度接口),主要的实现类为TaskSchedulerImpl,主要负责在接受到DAGScheduler发送来的TaskSets后,将其提交给集群,并在执行期间出现问题时重新提交Tasks,最后将结果events返回给DAGScheduler。

    SchedulerBackend:作为TaskScheduler的后台进程,负责与各种平台的cluster manager交互,并为Application申请相应的资源,SchedulerBanckend类有多种实现,例如Application如果提交给yarn平台进行资源的管理调度,则SchedulerBackend对应的实现类为YarnSchedulerBackend,如果是采用Deploy模式,则实现类为SparkDeploySchedulerBackend。

    以下源码分析均是基于Deploy模式,其他模式在SchedulerBackend实现上略有不同,不过其调度原理和实现都是一样的。

    三个重要类实例的初始化及其之间的关系

    我们可以从SparkContext的初始化入手来分析以上三个重要类的初始化,当提交Application后,spark会首先初始化SparkContext实例并创建driver,来看一下SparkContext中实例化三个重要类的代码:

    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    

    其中TaskScheduler和SchedulerBackend是根据传入的master进行模式匹配得出的,不同的平台有不同的实现,而DAGScheduler是直接new出来的,且DAGScheduler实例中持有TaskScheduler的引用,这一点可以从DAGScheduler的构造代码中看出:

    def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
      this(
        sc,
        taskScheduler,
        sc.listenerBus,
        sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
        sc.env.blockManager.master,
        sc.env)
    }
    
    提交Job

    通过上述源码可知,在Application提交之前,SparkContext实例化的过程中,就已经实例好了_schedulerBackend ,_taskScheduler,_dagScheduler这三个实例,那么接下来,我们通过active操作count方法的代码来看一下Job是如何提交的:

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    

    runJob方法最终调用的是dagScheduler的runJob方法:

    def runJob[T, U: ClassTag](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        resultHandler: (Int, U) => Unit): Unit = {
      if (stopped.get()) {
        throw new IllegalStateException("SparkContext has been shutdown")
      }
      val callSite = getCallSite
      val cleanedFunc = clean(func)
      logInfo("Starting job: " + callSite.shortForm)
      if (conf.getBoolean("spark.logLineage", false)) {
        logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
      }
      dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
      progressBar.foreach(_.finishAll())
      rdd.doCheckpoint()
    }
    

    在DAGScheduler的runJob方法中,生成了一个JobWaiter实例来监听Job的执行情况,只有当Job中的所有Task全都成功完成,Job才会被标记成功:

    def runJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): Unit = {
      val start = System.nanoTime
      //生成一个JobWaiter的实例来监听Job的执行情况,只有当Job中的所有的Task全都成功完成,Job才会被标记成功
      val waiter: JobWaiter[U] = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
      waiter.awaitResult() match {
        case JobSucceeded =>
          logInfo("Job %d finished: %s, took %f s".format
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        case JobFailed(exception: Exception) =>
          logInfo("Job %d failed: %s, took %f s".format
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
          val callerStackTrace = Thread.currentThread().getStackTrace.tail
          exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
          throw exception
      }
    }
    

    在submitJob方法中首先创建了JobWaiter实例,并且通过eventProcessLoop来发送JobSubmitted消息,这个eventProcessLoop使用来监听DAGScheduler自身的一些消息,在实例化DAGScheduler时创建该实例

    def submitJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): JobWaiter[U] = {
      // Check to make sure we are not launching a task on a partition that does not exist.
      val maxPartitions = rdd.partitions.length
      partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
        throw new IllegalArgumentException(
          "Attempting to access a non-existent partition: " + p + ". " +
            "Total number of partitions: " + maxPartitions)
      }
      val jobId = nextJobId.getAndIncrement()   //获取JobId
      if (partitions.size == 0) {
        // Return immediately if the job is running 0 tasks
        return new JobWaiter[U](this, jobId, 0, resultHandler)
      }
      assert(partitions.size > 0)
      val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
      // 生成一个JobWaiter的实例来监听Job的执行情况,只有当Job中的所有的Task全都成功完成,Job才会被标记成功
      val waiter: JobWaiter[U] = new JobWaiter(this, jobId, partitions.size, resultHandler)
      // DAGSchedulerEventProcessLoop这个实例的主要职责是调用DAGScheduler的相应方法来处理DAGScheduler发送给他的各种消息,起监督Job的作用
      eventProcessLoop.post(JobSubmitted(
        jobId, rdd, func2, partitions.toArray, callSite, waiter,
        SerializationUtils.clone(properties)))    //DAGScheduler向eventProcessLoop提交该Job,最终调用eventProcessLoop的run方法来处理请求
      waiter
    }
    

    eventProcessLoop最终调用其doOnReceive方法来处理所有的Event:

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        //如果提交的是一个JobSubmitted的Event,那么调用handleJobSubmitted方法来处理这个请求
      case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
        dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
      case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
        dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
      ...
    }
    

    到这里,Job就已经提交了,接下来是对Job提交的处理,即DAGScheduler的最主要的功能:划分stage

    划分stage

    我们来看DAGScheduler的handleJobSubmitted方法代码,其中是如何划分stage的,我们分为几段来看

    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // 首先调用newResultStage方法来创建finalStage
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    

    我们可以看到,DAGShceduler首先创建最后一个stage:finalStage,我们看一看newResultStage方法:

    private def newResultStage( //创建最后一个stage的方法
        rdd: RDD[_],
        func: (TaskContext, Iterator[_]) => _,
        partitions: Array[Int],
        jobId: Int,
        callSite: CallSite): ResultStage = {
      //通过调用getParentStagesAndId方法来划分stage,传入最后一个RDD和JobId
      val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
      val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
      stageIdToStage(id) = stage
      updateJobIdStageIdMaps(jobId, stage)
      stage
    }
    

    在创建finalStage的时候需要传入其parentStages,这也是构成DAG调度计划的一个重要部分,看其实现

    private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
      val parentStages: List[Stage] = getParentStages(rdd, firstJobId)   //找到parentStages
      val id = nextStageId.getAndIncrement()    //nextStageId是一个AtomicInteger,自增1
      (parentStages, id)    //返回parentStages的序列和对应的Id
    }
    

    其中调用了getParentStages方法,在getParentStages中实现了递归调用,返回的是Stage的List

    private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
      val parents = new HashSet[Stage]    //parents序列
      val visited = new HashSet[RDD[_]]   //已经被访问的RDD
      // We are manually maintaining a stack here to prevent StackOverflowError
      // caused by recursively visiting
      val waitingForVisit = new Stack[RDD[_]]   //需要被处理的RDD栈
      def visit(r: RDD[_]) {
        if (!visited(r)) {    //如果栈中的RDD不在被访问的序列中,则加进去
          visited += r
          // Kind of ugly: need to register RDDs with the cache here since
          // we can't do it in its constructor because # of partitions is unknown
          for (dep <- r.dependencies) {   //遍历这个RDD的dependencies
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>   //如果匹配到是ShuffleDependency
                parents += getShuffleMapStage(shufDep, firstJobId)    //调用getShuffleMapStage方法生成一个stage加入到parents序列中
              case _ =>   //如果是窄依赖将访问dep对应的RDD压入待访问栈(这里的RDD应该是之前一个RDD的父RDD,相当于实现了一个递归)
                waitingForVisit.push(dep.rdd)
            }
          }
        }
      }
      waitingForVisit.push(rdd) //将最后一个RDD放入待访问栈
      while (waitingForVisit.nonEmpty) {
        visit(waitingForVisit.pop())    //如果需要被处理的RDD栈不为空,则调用visit方法取出里栈中的RDD
      }
      parents.toList
    

    以上代码中可以看出,划分stage的依据是shuffleDependency,以上代码的精彩之处在于自建了一个待访问栈:waitingForVisit,通过出栈入栈以及RDD之间的Dependency实现了一个递归调用,体现了spark源码的优雅之处。其中当遇到ShuffleDependency的时候,调用getShuffleMapStage方法创建了新的Stage,我们来看一下这个方法:

    private def getShuffleMapStage(
        shuffleDep: ShuffleDependency[_, _, _],
        firstJobId: Int): ShuffleMapStage = {
      shuffleToMapStage.get(shuffleDep.shuffleId) match {
        case Some(stage) => stage   //存在就获取
        case None =>    //不存在就创建
          // We are going to register ancestor shuffle dependencies
          // 将对应的RDD再调用getAncestorShuffleDependencies方法注册其祖先的依赖,负责确认这个stage它的parentStage是否已经生成
          getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
            //拿到还没有注册的stage序列遍历,调用newOrUsedShuffleStage方法注册到shuffleToMapStage中
            shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
          }
          // Then register current shuffleDep
          val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
          shuffleToMapStage(shuffleDep.shuffleId) = stage
          stage
      }
    }
    

    以上方法中,维护了一个shuffleToMapStage集合,存有shuffleId和ShuffleMapStage的映射,根据传入的shuffleDep,如果存在就返回,如果不存在就创建,其中getAncestorShuffleDependencies方法是为了找到那些没有被注册到shuffleToMapStage集合的Stage,其中递归调用的模样像极了getParentStages方法,而newOrUsedShuffleStage则是创建shuffle map stage的方法,来看一下newOrUsedShuffleStage

    /**
      * 根据传入的Dep对应的RDD创建一个shuffle map stage,这个stage会包含传入的JobID
      * 如果这个stage之前已经存在于MapOutputTracker中,那么会覆盖
      */
    private def newOrUsedShuffleStage(
        shuffleDep: ShuffleDependency[_, _, _],
        firstJobId: Int): ShuffleMapStage = {
      val rdd = shuffleDep.rdd
      val numTasks = rdd.partitions.length    //这个RDD的partitions的数量就是task的数量
      val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)   //创建stage
      if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {   //如果mapOutputTracker中已经存在这个shuffleDep
        val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) //把之前的元数据信息提取出来
        val locs = MapOutputTracker.deserializeMapStatuses(serLocs)   //修改覆盖
        (0 until locs.length).foreach { i =>
          if (locs(i) ne null) {
            // locs(i) will be null if missing
            stage.addOutputLoc(i, locs(i))
          }
        }
      } else {    //如果没有,就直接注册进去
        // Kind of ugly: need to register RDDs with the cache and map output tracker here
        // since we can't do it in the RDD constructor because # of partitions is unknown
        logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
        mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
      }
      stage
    }
    

    以上代码中,首先调用了newShuffleMapStage方法创建了ShuffleMapStage,其次由于是ShuffleMapStage,存在shuffle的过程,会有中间数据落地的过程,所以需要重新注册修改一下mapOutputTracker,mapOutputTracker是用来管理map端输出的。其中newShuffleMapStage方法和newResultStage方法如出一辙,首先调用getParentStagesAndId方法获取parentStage,然后创建ShuffleMapStage实例

    private def newShuffleMapStage(
        rdd: RDD[_],
        numTasks: Int,
        shuffleDep: ShuffleDependency[_, _, _],
        firstJobId: Int,
        callSite: CallSite): ShuffleMapStage = {
      val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
      val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
        firstJobId, callSite, shuffleDep)
      stageIdToStage(id) = stage
      updateJobIdStageIdMaps(firstJobId, stage)
      stage
    }
    

    在方法最后调用updateJobIdStageIdMaps将新建的stage的stageId与JobId联系起来。

    以上这些方法中,我们首先创建了finalStage,然后通过RDD之间的Dependency,采用递归调用的方法,找出了这个finalStage的parentStages队列,并维护到相关的数据结构中。


    下面我们来看一下,如何提交上面创建的这些Stages

    我们回到handleJobSubmitted,看一下finalStage创建完成后的代码

    // 拿到finalStage之后就可以创建job了
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()    //清空taskLocation的缓存
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job   //jobId与job的映射放入集合中
    activeJobs += job   //job加入activeJobs中
    finalStage.setActiveJob(job)    //将finalStage的activeJob属性指定为当前job
    val stageIds: Array[Int] = jobIdToStageIds(jobId).toArray   //根据jobId取出对应的stageIds
    //根据stageIds取出stage的lastestInfo
    val stageInfos: Array[StageInfo] = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)   //提交finalStage
    submitWaitingStages()   //提交waiting队列的stages
    

    首先创建了Job实例,并维护了相关的数据结构,最后调用submitStage方法并传入了finalStage,我们来看这个submitStage的具体实现

    /** Submits stage, but first recursively submits any missing parents. */
    // 提交这个stage,首先递归的提交它的missing parents
    private def submitStage(stage: Stage) {
      val jobId = activeJobForStage(stage)    //拿到stage对应的jobId
      if (jobId.isDefined) {    //如果不为空
        logDebug("submitStage(" + stage + ")")
        // 如果这个stage不在waiting、running、failed队列中
        if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
          val missing: List[Stage] = getMissingParentStages(stage).sortBy(_.id)    //找到这个stage的missing parent stages
          logDebug("missing: " + missing)
          if (missing.isEmpty) {    //如果有未提交的parentStages,那么递归的提交它的missing parent stages, 最后提交这个stage
            logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
            submitMissingTasks(stage, jobId.get)    //这个方法会完成DAGScheduler最后的工作
          } else {
            for (parent <- missing) {
              submitStage(parent)   //这里实现递归
            }
            waitingStages += stage
          }
        }
      } else {
        abortStage(stage, "No active job for stage " + stage.id, None)
      }
    }
    

    在这个方法中我们又看到了递归调用的精妙之处,对传入的finalStage,首先确认其有没有未提交的parentStages,如果有首先提交其parentStage,而当前的Stage就会被放入waitingStages中,通过submitWaitingStages方法来调用,针对每一个提交的Stage调用submitMissingTasks来完成最后的工作

    封装Tasks

    通过以上的方法,finalStage以及其parentStages都已经递归提交了,通过submitMissingTasks这个方法,我们可以得知提交的Stage都做了什么操作,submitMissingTasks方法代码较长,首先针对传入的Stages维护了像runningStages、outputCommitCoordinator等数据结构,我们截选关键部分来看:

    // 这里取到了Tasks的序列
    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.internalAccumulators)
          }
        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, stage.internalAccumulators)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceS
        runningStages -= stage
        return
    }
    

    这里对传入的Stages进行模式匹配,如果是ResultStage即finalStage,那么创建ResultTask,如果是ShuffleMapStage ,则创建ShuffleMapTask,接着看下面的代码:

    // 如果tasks序列不为空,那么封装成TaskSet,走你,接下来看taskScheduler的了
    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)
    }
    

    可以看到,这里将上一步创建的Tasks实例封装成为TaskSet,然后调用TaskScheduler的submitTasks方法提交给集群,至此DAGScheduler的任务已经圆满结束,它剩下的工作仅是通过eventProcessLoop来监听TaskScheduler返回的一些信息,这也是DAGScheduler实例中持有TaskScheduler引用的原因。

    下一篇文章中我们继续分析TaskScheduler在提交Tasks时做了哪些操作,且SchedulerBackend是如何在调度资源的分配上做到公平公正的,敬请期待!

    相关文章

      网友评论

          本文标题:spark源码阅读之scheduler模块①

          本文链接:https://www.haomeiwen.com/subject/zfoeyqtx.html