美文网首页
Spark Stage如何划分

Spark Stage如何划分

作者: 嘻嘻是小猪 | 来源:发表于2021-04-23 18:08 被阅读0次

本文基于2.3.0
众所周知,RDD的依赖关系形成后,我们就可以根据宽依赖划分Stage了。
目前Spark 的 stage分为两种: org.apache.spark.scheduler.ResultStageorg.apache.spark.scheduler.ShuffleMapStage, 他们都是由org.apache.spark.scheduler.DAGScheduler对RDD进行划分得来。

关于DAGScheduler的几个问题:

DAGScheduler什么时候生成

SparkContext初始化时

_dagScheduler = new DAGScheduler(this)
怎么触发DAGScheduler进行工作

总的来说,DAGScheduler每一个工作的开始,依靠事件驱动,其中当然也包括Stage划分。
下面来详细分析一番:
与DAGScheduler工作相关的驱动事件都定义在DAGSchedulerEvent.scala里,其中和启动Stage划分有关的就是org.apache.spark.scheduler.JobSubmitted

各种事件
下面简单梳理一下整个事件触发的流程:
首先org.apache.spark.SparkContext#submitJob提交job
def submitJob[T, U, R](
      rdd: RDD[T],
      processPartition: Iterator[T] => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit,
      resultFunc: => R): SimpleFutureAction[R] =
  {
    assertNotStopped()
    val cleanF = clean(processPartition)
    val callSite = getCallSite
    val waiter = dagScheduler.submitJob( / 重点
      rdd,
      (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
      partitions,
      callSite,
      resultHandler,
      localProperties.get)
    new SimpleFutureAction(waiter, resultFunc)
  }

可以看到,最终还是调用dagScheduler.submitJob()
进入org.apache.spark.scheduler.DAGScheduler#submitJob方法

 def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    ......
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted( / 把事件post出去
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

很明显,eventProcessLoopJobSubmitted事件做了一个post操作。这个post操作很简单,就是将事件放入了org.apache.spark.util.EventLoop#eventQueue这个阻塞队列而已。
代码中的eventProcessLoop是什么呢? 其实就是EventLoop的子类org.apache.spark.scheduler.DAGSchedulerEventProcessLoop。很显然,上面提到的队列也就是DAGSchedulerEventProcessLoop对象的。
org.apache.spark.util.EventLoop这个类里有个重要的线程需要提一下:

private val eventThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            ......
      } catch {
          ......
      }
    }
  }

这个线程一旦开启,会不停在eventQueue取事件,然后调用onReceive(event)

到这,其实大概就已经能猜到事件是如何触发DAGScheduler工作了。
DAGScheduler内部有成员变量DAGSchedulerEventProcessLoop, 并且会在自己初始化时,调用其start方法

private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
eventProcessLoop.start()

这个start()方法做了什么呢?

def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

没错,启动了eventThread。到此,可以看到DAGScheduler持有eventProcessLoop, 自己post,自己消费。
还记得eventThread#run方法里调用了onReceive(event)吗?我们看下DAGSchedulerEventProcessLoop是如何实现父类的onReceive(event)的。

override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event) /重点
    } finally {
      timerContext.stop()
    }
  }

很简单,继续跟进doOnReceive(event) , org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
......
}

很简单,继续跟进dagScheduler.handleJobSubmitted, org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      / 分Stage入口
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 
    }
    ......
}

看到createResultStage,这就是开始划分Stage了。
到这里,总结而言,提交job后,dagScheduler会根据JobSubmitted事件,触发stage划分工作。

具体DAGScheduler是如何划分的

上面部分已经提到createResultStage方法,跟进看一下:

private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    /1. 建立ResultStage 的 父Stage
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    /2. ResultStage直接new就可以了
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

ResultStage没有什么太特别,直接new出来了,主要还是它的父Stage们。我们再关注getOrCreateParentStages(rdd, jobId)

 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

上面代码意思很明显,要获得rdd的ShuffleDependencies, 也就是宽依赖们。然后逐个生成ShuffleMapStage. 那我们继续跟踪getOrCreateShuffleMapStage(shuffleDep, firstJobId)的调用,最后会来到DAGScheduler#createShuffleMapStage

def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd / 这里拿出了父rdd
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId) / 在此继续往前追溯ShuffleMapStage
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage( /这个ShuffleMapStage的生成就到此为止了
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

到这里,ShuffleMapStage生成了。
总结,整体过程还是比较清晰,从最末尾的rdd开始,往前追溯,按宽依赖划分生成ShuffleMapStage, 最后一个stage直接为ResultStage。整个过程中也同时在构建stage间的父子依赖关系。

收工!!!

相关文章

网友评论

      本文标题:Spark Stage如何划分

      本文链接:https://www.haomeiwen.com/subject/ifacrltx.html