美文网首页spark
任务调度-源码分析

任务调度-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-21 07:55 被阅读0次
    //包装成一个任务级进行提交
    taskScheduler.submitTasks(new TaskSet(
           tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
           stage.resourceProfileId))
    

    TaskSchedulerImpl.submitTasks

    //任务集管理器
    private[scheduler] def createTaskSetManager(
          taskSet: TaskSet,
          maxTaskFailures: Int): TaskSetManager = {
        new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
      }
    //调度buid加入管理器
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    
    

    调度器初始化

     def initialize(backend: SchedulerBackend): Unit = {
        this.backend = backend
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
            case _ =>
              throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
              s"$schedulingMode")
          }
        }
        schedulableBuilder.buildPools()
      }
    

    FIFOSchedulableBuilder.addTaskSetManager

      override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
       //任务池
        rootPool.addSchedulable(manager)
      }
    

    backend.reviveOffers()
    CoarseGrainedSchedulerBackend.reviveOffers

     override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
         //给自己发送ReviveOffers消息
        driverEndpoint.send(ReviveOffers)
      }
    

    自己接收消息

     override def receive: PartialFunction[Any, Unit] = {
          case StatusUpdate(executorId, taskId, state, data, resources) =>
            scheduler.statusUpdate(taskId, state, data.value)
            if (TaskState.isFinished(state)) {
              executorDataMap.get(executorId) match {
                case Some(executorInfo) =>
              ... ...
    
          case ReviveOffers =>
          //接收到ReviveOffers消息
            makeOffers()
    

    CoarseGrainedSchedulerBackend.makeOffers

    private def makeOffers(): Unit = {
          // Make sure no executor is killed while some task is launching on it
         //得到任务的描述信息
          val taskDescs = withLock {
               ... ...
                    (rName, rInfo.availableAddrs.toBuffer)
                  }, executorData.resourceProfileId)
            }.toIndexedSeq
            //调度任务,从任务池里取任务 执行
            scheduler.resourceOffers(workOffers, true)
          }
          if (taskDescs.nonEmpty) {
            //任务运行
            launchTasks(taskDescs)
          }
        }
    

    resourceOffers

        val sortedTaskSets = rootPool.getSortedTaskSetQueue
    //判断本地化级别
     for (currentMaxLocality <- taskSet.myLocalityLevels) {
              var launchedTaskAtCurrentMaxLocality = false
              do {
                val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
                  taskSet, currentMaxLocality, shuffledOffers, availableCpus,
                  availableResources, tasks, addressesWithDescs)
                launchedTaskAtCurrentMaxLocality = minLocality.isDefined
                launchedAnyTask |= launchedTaskAtCurrentMaxLocality
                noDelaySchedulingRejects &= noDelayScheduleReject
                globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
              } while (launchedTaskAtCurrentMaxLocality)
            }
    
     override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
        val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
        val sortedSchedulableQueue =
    //跟据调度算法进行manager排序   schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
        for (schedulable <- sortedSchedulableQueue) {
          sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
        }
        sortedTaskSetQueue
      }
    
    
     private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
       //不同的调度模式 有不同算法
        schedulingMode match {
          case SchedulingMode.FAIR =>
            new FairSchedulingAlgorithm()
          case SchedulingMode.FIFO =>
            new FIFOSchedulingAlgorithm()
          case _ =>
            val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
            throw new IllegalArgumentException(msg)
        }
      }
    

    最终拿到任务就开始执行了
    launchTasks

     private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
        遍历每一个任务描述
         for (task <- tasks.flatten) {
           val serializedTask = TaskDescription.encode(task)
         //是否task序列化的size超出限制
           if (serializedTask.limit() >= maxRpcMessageSize) {
             Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
               try {
                 var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                   s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
                   s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
                 msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
                 taskSetMgr.abort(msg)
               } catch {
                 case e: Exception => logError("Exception in error callback", e)
               }
             }
           }
           else {
             val executorData = executorDataMap(task.executorId)
             // Do resources allocation here. The allocated resources will get released after the task
             // finishes.
             val rpId = executorData.resourceProfileId
             val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
             val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
             executorData.freeCores -= taskCpus
             task.resources.foreach { case (rName, rInfo) =>
               assert(executorData.resourcesInfo.contains(rName))
               executorData.resourcesInfo(rName).acquire(rInfo.addresses)
             }
    
             logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
               s"${executorData.executorHost}.")
    //找到对应executor的终端,发送LaunchTask消息
             executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
           }
         }
       }
    
    

    相关文章

      网友评论

        本文标题:任务调度-源码分析

        本文链接:https://www.haomeiwen.com/subject/rmimgltx.html