美文网首页
TaskScheduler 源码浅析

TaskScheduler 源码浅析

作者: 越过山丘xyz | 来源:发表于2019-02-05 21:05 被阅读0次

    TaskScheduler

    TaskScheduler 负责对 DAGScheduler 提交过来的 Task 与最佳位置的 Executor 进行绑定,然后通过 SchedulerBackend 发送到 Executor 上去执行。

    image

    在这个版本中,TaskScheduler 只有一个实现类,就是 TaskSchedulerImpl,在 Spark-Core 的 org.apache.spark.scheduler 包下。

    源码

    在 SparkContext 中对 TaskScheduler 进行了初始化操作,在 SparkContext 概览中提到过:

    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _taskScheduler = ts
    
    // 对 TaskScheduler 进行了启动
    _taskScheduler.start()
    

    SparkContext.createTaskScheduler() 会根据运行模式的不同,创建不同类型的 SchedulerBackend,我这里以 Standalone 模式为例:

    private def createTaskScheduler(...) = {
      import SparkMasterRegex._
        
      master match {
          
        // Standalone 模式
        case SPARK_REGEX(sparkUrl) =>
          val scheduler = new TaskSchedulerImpl(sc)
          val masterUrls = sparkUrl.split(",").map("spark://" + _)
          val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
          scheduler.initialize(backend)
          (backend, scheduler)
          
          // ...
      }
        
    }
    

    我们先看下 TaskScheduler.initialize() 方法:

    def initialize(backend: SchedulerBackend) {
      this.backend = backend
      // 根据 conf 中的设置,选择不同模式的任务调度器
      // 通过设定 SCHEDULER_MODE_PROPERTY 这个值来更改
      // 默认为 FIFO
      schedulableBuilder = {
        schedulingMode match {
         // 先进先出
          case SchedulingMode.FIFO =>
            // RootPool => TaskSetManager 的调度池(一个队列)
            new FIFOSchedulableBuilder(rootPool)
          // 公平
          case SchedulingMode.FAIR =>
            new FairSchedulableBuilder(rootPool, conf)
          case _ =>
            throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
            s"$schedulingMode")
        }
      }
      // 创建树形节点
      schedulableBuilder.buildPools()
    }
    

    在 DAGScheduler 划分完 Stage 后,会将其封装成 TaskSet 并提交给 TaskScheduler.submitTasks() 来做进一步的工作,我们先看下它的实现细节:

    override def submitTasks(taskSet: TaskSet) {
      // 取出 TaskSet 中的 Task
      val tasks = taskSet.tasks
      this.synchronized {
        // 为每个 TaskSet 创建一个 TaskSetManager
        // TaskSetManager 负责任务失败时的重试工作
        val manager = createTaskSetManager(taskSet, maxTaskFailures)
        // 将 manager 添加到 schedulableBuilder 中
        // schedulableBuilder 负责 TaskSetManager 的调度
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    
        // 其它操作
    
        hasReceivedTask = true
      }
      // 资源的分配
      backend.reviveOffers()
    }
    

    我们先看下 SchedulableBuilder.addTaskSetManager() 的实现细节,以 FIFO 模式为例:

    override def addTaskSetManager(manager: Schedulable, properties: Properties) {
      rootPool.addSchedulable(manager)
    }
    

    rootPool.addSchedulable() 的实现细节:

    override def addSchedulable(schedulable: Schedulable) {
      require(schedulable != null)
      // 将 TaskSetManager 放入到 schedulable 队列中
      schedulableQueue.add(schedulable)
      schedulableNameToSchedulable.put(schedulable.name, schedulable)
      schedulable.parent = this
    }
    

    从代码中可以看出,当 DAGScheduler 提交完 TaskSet 后,就会为其创建一个 TaskSetManager,然后将 TaskSetManager 放入到 TaskSetManager 队列(池)中去等待执行。

    接下来我们看看 backend.reviveOffers() 的实现细节,看看 TaskSetManager 是如何被调用的:

    override def reviveOffers() {
      // 发送一个 ReviveOffers 消息
      // DriverEndpoint 上文提到过
      driverEndpoint.send(ReviveOffers)
    }
    

    DriverEndpoint.receive() 会对这个消息进行处理 (CoarseGrainedSchedulerBackend 的内部类):

    override def receive: PartialFunction[Any, Unit] = {
    
      case ReviveOffers =>
        makeOffers()
    
      // 略略略
        
    }
    

    makeOffers() 的实现细节:

    private def makeOffers() {
      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
        // executorDataMap 为 executor 字典
        // 上文提到过,将反注册过来的 Executor 都放到了这里
        // 找出活跃的 Executor
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        // 封装 Executor 可用的资源量和联系方式
        val workOffers = activeExecutors.map { case (id, executorData) =>
          new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
        }.toIndexedSeq
        // 将可用的 Executor 资源信息发送给 TaskScheduler.resourceOffers()
        // TaskScheduler 会按最优的条件将 Task 与 Executor 进行绑定并返回其集合
        scheduler.resourceOffers(workOffers)
      }
      // 将与 Executor 绑定完的 Task 交给 Eexecutor 去执行
      if (!taskDescs.isEmpty) {
        // 在每个 Executor 上启动分别启动其对应的 Task
        launchTasks(taskDescs)
      }
    

    TaskScheduler.resourceOffers() 的实现细节:

    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    
      // 要执行的任务集
      // TaskDescription 中有 Task ID 和 Executor ID
      // 下面的代码会将 Task 与 Executor 进行绑定,确定任务要到哪个 Executor 上去执行
      val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
        
      // 获取排序后的等待执行的 TaskSetManager 
      val sortedTaskSets = rootPool.getSortedTaskSetQueue
      for (taskSet <- sortedTaskSets) {
        if (newExecAvail) {
          // 计算每个 Task 执行的位置(多个)
          // 有兴趣的可以点进去看看
          taskSet.executorAdded()
        }
      }
    
      // 根据优先级(本地、机架...) 将每个 Task 与 Executor 进行绑定
      for (taskSet <- sortedTaskSets) {
        var launchedAnyTask = false
        var launchedTaskAtCurrentMaxLocality = false
        for (currentMaxLocality <- taskSet.myLocalityLevels) {
          do {
            launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
              taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
          } while (launchedTaskAtCurrentMaxLocality)
        }
        if (!launchedAnyTask) {
          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
        }
      }
    
      // 将与 Executor 绑定完的 Task 集合返回给 SchedulerBackend
      // SchedulerBackend 接下来会提交给 Executor 去执行
      return tasks
    }
    

    SchedulerBackend.launchTasks() 的实现细节:

    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = TaskDescription.encode(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          // 资源不够用
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            // 其它操作
          }
        }
        else {
          // 其它操作
    
          // 向 Executor 发送了一个启动任务的请求
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }
    
    // Executor 收到请求后
    // 先简单看一下
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        // 启动任务
        executor.launchTask(this, taskDesc)
      }
    

    总的来说,TaskScheduler 负责着任务的调度、唤醒与重启的工作。

    相关文章

      网友评论

          本文标题:TaskScheduler 源码浅析

          本文链接:https://www.haomeiwen.com/subject/gsnvsqtx.html