美文网首页
DAGScheduler 源码浅析

DAGScheduler 源码浅析

作者: 越过山丘xyz | 来源:发表于2019-02-05 15:31 被阅读0次

DAGScheduler

DAGScheduler 的主要工作包括:创建 Job、划分 Stage、最后将 Stage 封装成 TaskSet 提交提交给 TaskScheduler 去处理这三个阶段。

image

DAGScheduler 在 Spark-Core 的 org.apache.spark.scheduler 包下。

实例化

DAGScheduler 是在 SparkContext 中进行的实例化,在 SparkContext 概览中提到过:

_dagScheduler = new DAGScheduler(this)

我们再了解下 DAGScheduler 中的一个比较重要的成员变量及其启动,后面会用到:

// 事件处理队列
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

// 这条语句在 DAGScheduler 的最后面
// 后文会用到
eventProcessLoop.start()

Stage 划分

在 Spark 中有 ShuffleMapStage 和 ResultStage 两种 Stage,ShuffleMapStage 为中间阶段的 Stage,ResultStage 结果应用的 Stage。

类关系

在 Spark 程序中,都是遇到 Action 操作时才会提交任务。所以,我们需要在 RDD 中找个 Action 算子作为切入点, 我这里以 RDD.collect() 方法为例:

// RDD.collect()
def collect(): Array[T] = withScope {
  // 调用 SparkContext.runJob() 方法
  // 注意:这里将 Final RDD 作为参数传递给了 SparkContext
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

SparkContext.runJob() 的实现细节:

// 通过层层调用,最终会调用这个 runJob() 方法
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {

  // 其它操作

  // 这里调用了 DAGScheduler.runJob() 方法
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

  // 其它操作
}

从任意 Action 操作入手都可以找到 DAGScheduler 的入口,接下来让我们看看 DAGScheduler.runJob() 方法的实现细节:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {

  // 记录启动时间
  val start = System.nanoTime
  // 调用了自身的 submitJob() 方法
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

  // 其它操作
}

DAGScheduler.submitJob() 的实现细节:

def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {

  // 其它操作

  // 将任务放入到事件队列中
  // eventProcessLoop 内部的线程会在合适(有空)的时候进行处理
  // 注意:发送的是 JobSubmitted 消息
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))

  waiter
}

eventProcessLoop.post() 的实现细节:

def post(event: E): Unit = {
  eventQueue.put(event)
}

从 eventQueue 的名字可以看出,将 JobSubmitted 消息加入到了队列中。

接下来,我们需要进入到 eventProcessLoop (是 DAGSchedulerEventProcessLoop 的实例化对象) 中,看看它是如何处理 Job 的了。

DAGSchedulerEventProcessLoop 继承自 EventLoop,我们先看看 EventLoop 的成员变量:

// 事件队列
// eventProcessLoop.post() 就是向这个队列中添加的事件
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

// 线程
private val eventThread = new Thread(name) {
  setDaemon(true)

  override def run(): Unit = {
    // 这是精简后的代码
    while (!stopped.get) {
      // 从事件队列中取出并处理
      val event = eventQueue.take()
      onReceive(event)
    }
    
  }

}

DAGSchedulerEventProcessLoop 继承自 EventLoop,在 DAGScheduler 的最后执行 eventProcessLoop.start() 方法,这个方法在 EventLoop 中也有实现:

def start(): Unit = {
  onStart()
  // 把线程启动了
  eventThread.start()
}

线程启动后 eventThread 就会不断的从 eventQueue 中取出事件,然后交给 onReceive() 方法去处理,onReceive() 在 DAGSchedulerEventProcessLoop 中有实现:

override def onReceive(event: DAGSchedulerEvent): Unit = {
  val timerContext = timer.time()
  try {
    doOnReceive(event)
  } finally {
    timerContext.stop()
  }
}

doOnReceive(event) 的实现细节:

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
   
   // 处理提交过来的 Job
  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  case StageCancelled(stageId, reason) =>
    dagScheduler.handleStageCancellation(stageId, reason)
    
    // ...
    
}

任务最终会交给 DAGScheduler.handleJobSubmitted() 来执行:

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // 通过 FinalRDD 来构建 ResultStage
    // 通过 RDD 的依赖关系,找到父依赖(到 ShuffleRDD 为止)来创建 ResultStage
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
     // ...
  }

  // 创建 Activer Job
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

  // 加入到 Activer Job 集合中
  activeJobs += job
  finalStage.setActiveJob(job)
  
  // 提交 FInal Stage
  submitStage(finalStage)
}

createResultStage() 根据 Final RDD 可以推断出 Result Stage。

接下来,让我们看看 DAGScheduler 是如何推断缺失的 Stage 的,DAGScheduler.submitStage() 的实现细节:

private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      
      // 通过当前的 stage 去推断前面丢失的 stages
      val missing = getMissingParentStages(stage).sortBy(_.id)
        
      if (missing.isEmpty) {
        // 如果当前 stage 无依赖的 stage
        // 提交任务
        submitMissingTasks(stage, jobId.get)
      } else {
        // 将最开始的 stage 先提交
        // 考虑到 stage 多依赖的情况
        for (parent <- missing) {
          // 这里相当于有个递归调用
          // 将最开始的 stage 先提交
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

DAGScheduler 通过 ResultStage 来向前推断父 Stage ,接下来我们看看 DAGScheduler.getMissingParentStages() 是如何进行推断的:

private def getMissingParentStages(stage: Stage): List[Stage] = {
  val missing = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]

  // 等待被遍历的 RDD 栈
  val waitingForVisit = new Stack[RDD[_]]

  // 这里定义了一个内部方法,下面的 while 语句循环调用这个方法
  def visit(rdd: RDD[_]) {
    if (!visited(rdd)) {
      visited += rdd
      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
      if (rddHasUncachedPartitions) {
        // 遍历当前 RDD 的依赖列表
        for (dep <- rdd.dependencies) {
          dep match {
            // 如果是宽依赖
            case shufDep: ShuffleDependency[_, _, _] =>
              // 创建 ShuffleMapStage
              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
              if (!mapStage.isAvailable) {
                // 将这个 Stage 添加到 Missing stages 中
                missing += mapStage
              }
            // 如果是窄依赖
            case narrowDep: NarrowDependency[_] =>
              // 将这个依赖的 RDD 添加到等待遍历的集合
              waitingForVisit.push(narrowDep.rdd)
          }
        }
      }
    }
  }
  waitingForVisit.push(stage.rdd)
  // 遍历等待被遍历的 RDD 栈
  while (waitingForVisit.nonEmpty) {
    // 对待遍历的 RDD 进行 visit(),现在看看 visit() 内部方法做了哪些操作
    visit(waitingForVisit.pop())
  }

  missing.toList
}

DAGScheduler 通过判断 RDD 是否为宽依赖作为 Stage 的划分标准,进而将一个任务划分成一个或多个 Stage。

最后我们 DAGScheduler.submitMissingTasks() 是如何将 Stage 提交的:

private def submitMissingTasks(stage: Stage, jobId: Int) {
    
  // 根据 Stage 的不同,选择不同的启动方式
  stage match {
    case s: ShuffleMapStage =>
      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
    case s: ResultStage =>
      outputCommitCoordinator.stageStart(
        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  }

  // ...

  // 根据 Stage 的不同,创建不同的 tasks
  val tasks: Seq[Task[_]] = try {
    val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
    stage match {
      case stage: ShuffleMapStage =>
        stage.pendingPartitions.clear()
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = partitions(id)
          stage.pendingPartitions += id

          // 创建成 ShuffleMapTask
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
            Option(sc.applicationId), sc.applicationAttemptId)
        }

      case stage: ResultStage =>
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = partitions(p)
          val locs = taskIdToLocations(id)

          // 创建成 ResultTask
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, properties, serializedTaskMetrics,
            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
        }
    }
  } catch {
     // 略略略..
  }

    // 其它操作

  if (tasks.size > 0) {
    // 略略略..

    // 这里调用了 TaskScheduler.submitTasks()
    // 将 TaskSet 交给 TaskScheduler 去执行
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  } else {
    // 略略略..
  }
}

简单的总结一下,DAGScheduler 对提交过来的任务进行了 Stage 的划分,并对每个 Stage 创建一个 TaskSet,最后通过 TaskScheduler.submitTasks() 方法,将各个TaskSet 交给 TaskScheduler 去处理。

相关文章

网友评论

      本文标题:DAGScheduler 源码浅析

      本文链接:https://www.haomeiwen.com/subject/ltdvsqtx.html