5.6 TaskScheduler 之TaskScheduler

1. 概述

TODO TaskSchedulerImpl和Backend之间的调用路径, backend如何通过心跳的RPC接口来接受Executor传回的各种信息
TaskSchedulerImpl是TaskScheduler接口的一个实现, 用在StandAlone模式下. 当SparkContext启动的时候, 注册到DAGScheduler, SchedulerBackend


我们刚才看到了TaskSetManager一个非常重要method resourceOffer, 它就是在这里被实际调用的.

我们刚才也看到了Pool一个非常重要的内部设置 SchedulerModel, 它也是在这里被初始化的, 选择是FIFO 还是FAIR模式

作为具体实现, 它把之前的几级抽象 Task -> TaskSet -> TaskSetManager -> Pool管理起来, 调用它们的方法.作为驱动器一样的存在.

2. 重要的结构

 * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
 * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
 * It handles common logic, like determining a scheduling order across jobs, waking up to launch
 * speculative tasks, etc.
 * Clients should first call initialize() and start(), then submit task sets through the
 * runTasks method.
 * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
 * threads, so it needs locks in public API methods to maintain its state. In addition, some
 * SchedulerBackends synchronize on themselves when they want to send events here, and then
 * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
 * we are holding a lock on ourselves.
private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
  extends TaskScheduler with Logging
  val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
  private val speculationScheduler =
  // Threshold above which we warn user initial TaskSet may be starved
  val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
  • 各种映射表, 存储映射关系
  // IDs of the tasks running on each executor
  private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]

  // The set of executors we have on each host; this is used to compute hostsAlive, which
  // in turn is used to decide when we can attain data locality on a given host
  protected val executorsByHost = new HashMap[String, HashSet[String]]

  protected val hostsByRack = new HashMap[String, HashSet[String]]

  protected val executorIdToHost = new HashMap[String, String]
  // TaskSetManagers are not thread safe, so any access to one should be synchronized
  // on this class.
  private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]

  private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
  val taskIdToExecutorId = new HashMap[Long, String]
  • 各种依赖的service
 // Listener object to pass upcalls into
  var dagScheduler: DAGScheduler = null
  var backend: SchedulerBackend = null
  val mapOutputTracker = SparkEnv.get.mapOutputTracker

3. 重要的几个内部方法

3.1 Saving SchedulerBackend and Building Schedulable Pools — initialize Method

内部会选择用那种scheduelr, FIFO或者FAIR


3.2 Starting TaskSchedulerImpl — start Method

启动逻辑, 在内部会启动Backend和各种检查状态的进程, backedn会跟sparkDriver要资源, 继而去找Executor


3.3 Handling Task Status Update — statusUpdate Method

当task被标记为FINISH FAILED KILLED LOST 标志时, 这里会找到这个task对应的tasksetmanager, 然后运行TaskSetManager的内部方法, 把这个task从runningTask列表中去掉, 通知DAGscheduler, 并进行后续的一系列的工作.

如果这个里执行失败, 会看到这样的一个日志.

ERROR Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)

这个方法接收到的信息是通过DriverEndpoint传递过来的, 也就是Executor直接向Driver发送了RPC请求来告诉Driver这边的Task的执行状态. 关于DriverEndPoint的知识. 我们在Driver-Executor那边有讲解.

3.3 Submitting Tasks for Execution (from TaskSet for Stage) — submitTasks Method

核心方法之一, 为taskSet初始化对应的TaskSetManager, 并加入到scheduler队列中. 后边Pool会担负起启动这个TaskSet的责任, 而TaskSet内部会把要执行的task交付到DAGScheduler手里附带着序列化好的数据流


  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
      hasReceivedTask = true

3.4taskSetFinished Method


3.5 Creating TaskDescriptions For Available Executor Resource Offers (with CPU Cores) — resourceOffers Method


resourceOffers 会随机的把任务发布到exeecutor上, 而不是用做id的对应, 防止executor-1总是跑TaskSet-1这种尴尬局面, 同时初始化各个Executor上的数据块, 记录可用的CPUS数


这里taskset的前后执行顺序我们前面提到过, 是Pool负责的




