美文网首页
Spark延迟任务调度-实例分析

Spark延迟任务调度-实例分析

作者: sf705 | 来源:发表于2018-01-25 17:07 被阅读217次

    最近在看Spark任务调度,尤其是延迟调度这块,翻了好多资料与博客都是发现延迟调度中是这样介绍的:
    “在为任务分配节点时(executor),先判断任务的最佳运行节点是否空闲,也就是本任务的数据本地性最高的节点。如果空闲直接分配任务,如果最佳节点没有足够资源则等待一定时间。在等待时间内得到资源,则任务在该节点运行,否则找出次佳节点运行。。。。”
    后看根据源码还是看的不太懂,为此找到一篇博客的作者大哥BIGUFO ,很热心和我讲解了许多,收益很多,当初自己还是理解的有些窄。
    先看一下我的理解,以TaskSchedulerImpl中resourceOffers开始

    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
        ......
        // 获取可用的executor列表并加入不同的集合中
        .....
    
        val shuffledOffers = shuffleOffers(filteredOffers)
        // Build a list of tasks to assign to each worker.
        val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
        val availableCpus = shuffledOffers.map(o => o.cores).toArray
        val sortedTaskSets = rootPool.getSortedTaskSetQueue  // 对获得的TaskSet排序
        for (taskSet <- sortedTaskSets) {
          logDebug("parentName: %s, name: %s, runningTasks: %s".format(
            taskSet.parent.name, taskSet.name, taskSet.runningTasks))
          if (newExecAvail) {
            taskSet.executorAdded()
          }
        }
        // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
        // of locality levels so that it gets a chance to launch local tasks on all of them.
        // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
        for (taskSet <- sortedTaskSets) {  // 取出每一个任务集
          var launchedAnyTask = false
          var launchedTaskAtCurrentMaxLocality = false
          for (currentMaxLocality <- taskSet.myLocalityLevels) {  // 按照任务集的locality
            do { // 对单个任务集调度
              launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
                taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
              launchedAnyTask |= launchedTaskAtCurrentMaxLocality
            } while (launchedTaskAtCurrentMaxLocality)
          }
          if (!launchedAnyTask) {
            taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
          }
        }
    
        if (tasks.size > 0) {
          hasLaunchedTask = true
        }
        return tasks
      }
    

    可以看出主要策略如下:

    • 1.随机打乱executor,避免分配到同一个work中
    • 2.对TaskSet排序
    • 3.为TaskSet分配资源,按照TaskSet最大的locality开始分配
    • 4.取第一个TaskSet,再取当前TaskSet的最大的locality,调用resourceOfferSingleTaskSet

    在此假设有三个work节点都为CPU为1,也就是只能并发执行一个任务,每个work节点只有一个executor,而当前TaskSet就四个任务,如下图:


    image.png

    task1和task2数据在executor1中,所以task1和task2的当前locality为process_local,task3和task4数据在work2和work3中,所以对应不同的work都为node_local

    现在任务开始调度,当前TaskSet的currentMaxLocality为process_local,执行函数 resourceOfferSingleTaskSet(TaskSet, PROCESS_LOCAL, ....) 对单个任务集调度
    该函数代码如下TaskSchedulerImpl#resourceOfferSingleTaskSet:

      private def resourceOfferSingleTaskSet(
          taskSet: TaskSetManager,
          maxLocality: TaskLocality,
          shuffledOffers: Seq[WorkerOffer],
          availableCpus: Array[Int],
          tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
        var launchedTask = false
        // nodes and executors that are blacklisted for the entire application have already been
        // filtered out by this point
        for (i <- 0 until shuffledOffers.size) {  //对传递过来的executor遍历
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
          if (availableCpus(i) >= CPUS_PER_TASK) { //判断传递过来的executor是否满足
            try { //如果满足,就选择当前的executor,和指定的locality来执行任务。
              for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                tasks(i) += task
                val tid = task.taskId
                taskIdToTaskSetManager(tid) = taskSet
                taskIdToExecutorId(tid) = execId
                executorIdToRunningTaskIds(execId).add(tid)
                availableCpus(i) -= CPUS_PER_TASK
                assert(availableCpus(i) >= 0)
                launchedTask = true
              }
            } catch {
              case e: TaskNotSerializableException =>
                logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
                // Do not offer resources for this task, but don't throw an error to allow other
                // task sets to be submitted.
                return launchedTask
            }
          }
        }
        return launchedTask
      }
    

    可以看出对在给定executor集合和locality情况下,对单个任务集调度过程如下:

    • 1.一个for循环遍历executor集合,假设我们选择上图中的executor1, 开始判断CPU核数是否满足,一看满足,便开始执行resourceOffer
    • 2.执行resourceOffer(executor1,work1,PROCESS_LOCAL), (这个locality是从TaskSchedulerImpl#resouceOffers传递过来的)
      其实这一步就是给定executor和locality,寻找最合适的任务,进入这个函数看一下:
     def resourceOffer(
          execId: String,
          host: String,
          maxLocality: TaskLocality.TaskLocality)
        : Option[TaskDescription] =
      {
       ......
        if (!isZombie && !offerBlacklisted) {
          val curTime = clock.getTimeMillis()
    
          var allowedLocality = maxLocality  //记录本地性 process_local
    
          if (maxLocality != TaskLocality.NO_PREF) {
            allowedLocality = getAllowedLocalityLevel(curTime)   //寻找当前允许的本地性
            if (allowedLocality > maxLocality) {  
    // 如果允许的本地性低,还是用原来的本地性
    // 假设getAllowedLocalityLevel返回的是NODE_LOCAL, 比原来PROCESS低,还是用PROCESS
              // We're not allowed to search for farther-away tasks
              allowedLocality = maxLocality
            }
          }
    
          dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
           ......
    

    可以看出,resourceOffer就是给定了executor和locality,来找合适的任务,我们给定的是executor1 和 PRCESS_LOCAL,开始执行getAllowedLocalityLevel,该函数就是返回允许执行的locality,进入该函数

    private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
       ......
        while (currentLocalityIndex < myLocalityLevels.length - 1) {
          val moreTasks = myLocalityLevels(currentLocalityIndex) match {
            case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
            case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
            case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
            case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
          }
          if (!moreTasks) {
            // This is a performance optimization: if there are no more tasks that can
            // be scheduled at a particular locality level, there is no point in waiting
            // for the locality wait timeout (SPARK-4939).
            lastLaunchTime = curTime
            logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
              s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
            currentLocalityIndex += 1
          } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
            // Jump to the next locality level, and reset lastLaunchTime so that the next locality
            // wait timer doesn't immediately expire
            lastLaunchTime += localityWaits(currentLocalityIndex)
            logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
              s"${localityWaits(currentLocalityIndex)}ms")
            currentLocalityIndex += 1
          } else {
            return myLocalityLevels(currentLocalityIndex)
          }
        }
        myLocalityLevels(currentLocalityIndex)
      }
    

    因为我们四个任务中,task1和task2相对于executor1是PROCESS,也就是PROCESS级别的有两个任务,所以会执行这句 case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) 然后返回本地性还是PROCESS_LOCAL, 然后就在resourceOffer中调用dequeueTask(executor1, work1, PROCESS_LOCAL) ,该函数如下:

     private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
        : Option[(Int, TaskLocality.Value, Boolean)] =
      {
        for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
          return Some((index, TaskLocality.PROCESS_LOCAL, false))
        }
        // TaskLocality <= maxLocality,  TaskLocality level is lower   by fyz
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
          for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
            return Some((index, TaskLocality.NODE_LOCAL, false))
          }
        }
    
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
          // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
          for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
            return Some((index, TaskLocality.PROCESS_LOCAL, false))
          }
        }
    
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
          for {
            rack <- sched.getRackForHost(host)
            index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
          } {
            return Some((index, TaskLocality.RACK_LOCAL, false))
          }
        }
    
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
          for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
            return Some((index, TaskLocality.ANY, false))
          }
        }
    
        // find a speculative task if all others tasks have been scheduled
        dequeueSpeculativeTask(execId, host, maxLocality).map {
          case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
      }
    

    在执行第一个for循环,发现有task1和task2满足,假设选择task1最为执行任务,那么直接return,task1在executor1执行了。
    注意dequeueTask成功一个后,此时返回到哪里了,返回的是TaskSchedulerImpl#resourceOfferSingleTaskSet,因为这仅仅是对executor1分配了任务,里面有个for循环呢,是对所有executor遍历,然后寻找合适的任务。
    返回后选择第二个executor,假设是executor2,此时的locality还是PROCESS_LOCAL(只有返回到resourceOffers中,locality才会改变),此时流程如下:

      1. 给定了executor2和 PROCESS_LOCAL, 调用resourceOffer
      1. 调用getAllowedLocalityLevel,因为task2相对executor1还是PROCESS,所以返回的还是PROCESS。此时假设等待时间超过默认的3s,则将该任务task2降一级。
      1. 调用dequeueTask(executor2,work2,PROCESS_LOCAL),执行第一个for,发现对executor2,没有PROCESS的任务,返回None,因为后面都有一个if判断过不去TaskLocality.isAllowed(),PROCESS是最高的
      1. 然后再次换一个executor,选择executor3,,同样上面前三步的操作

    注意,在第二步中,如果超时,在getAllowedLocalityLevel中,将任务的locality从PROCESS降为NODE_LOCAL,返回到resourceOffer,后面的if还会再次修改成PROCESS。但是有人认为,那干嘛还要降一级,反正最后还有修改过来,岂不是浪费感情?
    实际上,如果当前任务的locality从PROCESS降为NODE_LOCAL,虽然这次任务是以PORCESSS级别执行的,那么下次返回到TaskSchedulerImpl#resourceOffers中按照locality 的for循环(也就是NODE_LOCAL)后,再次执行任务,发现没有PROCESS的任务了,就好执行NODE_LOCAL的任务了,如果不降级,无论轮到TaskSet的哪个级别,总是在getAllowedLocalityLevel判断有一个PROCESS任务,又修改为PROCEES级别的任务了。
    可以发现,三个executor遍历完,就执行一个task1,然后再次返回,这次返回到的是TaskSchedulerImpl#resourceOffers中两个for循环中的do-while 循环,因为resourceOfferSingleTaskSet返回true,会一直执行do-while,除非满足一下条件中的一条,才结束do-while

      1. executor的CPU都不满足,这样就改不了resourceOfferSingleTaskSet的launchedTask=false的值
      1. 没有执行的任务了,这样就不能执行resourceOfferSingleTaskSet中的for了,也是无法修改launchedTask=false的值了。

    任务超时情况

    在第一次调用TaskSchedulerImpl#resourceOffers执行了任务task1,task3,task4,由于三个worker都是是一核心,那么每个work一次只能执行一个任务,所以还剩下一个task2 是对work1的PROCESS没有被执行,此时所有work都是忙碌状态,那么此时执行完毕resourceOffers,没有资源了。因为任务分配到资源后,会将任务提交到CoarseGrainedSchedulerBackend的launchTasks方法中,在该方法中会将任务发送到worker节点的Coarse...Backend执行。有人问难道没有资源了,剩下的一个task2就不执行了吗?
    其实调度池中是有一个pool来维护所有的TaskSet的,这次没有完成,下次再次执行该TaskSet,然后还是resourceOffers,如果还是没有资源,那么就在到pool了。那么来分析一下有资源的时,再次执行TaskSet的情况。
    假设下一次执行TaskSet,有空闲资源了,发现还时task2没有执行,且是对executor1的PROCESS的任务。此时发现空闲资源为worker2上的executor2,此时任务开始执行。

      1. 调用resourceOffers,然后以PROCESS开始调用TaskSchedulerImpl#resourceOfferSingleTaskSet,发现 executor2可用
      1. 然后调用TaskSetManager#resourceOffer,此时有两种,一是当前任务task2离上次任务的执行时间间隔小于3s,二是超过3s
    • 2.1. 如果当前小于3s ,那么就在dequeueTask时发现没有对task2的PROCESS的executor,那么返回到TaskSchedulerImpl#resourceOffers中了,继续按照locality再次执行
    • 2.2. 如果当前超过3s, 那么在TaskSetManager#getAllowedLocalityLevel中将task2的locality降为NODE_LOCAL ,但是比PROCESS级别低,在dequeueTask执行时,传入的allowedLocality还是为PROCESS,本地还是以PROCESS调度task2。
      1. 在2.2中还是调度不成功,返回到resourceOffers中,此时将PROCESS将为NODE_LOCAL执行,然后再次提交给resourceOfferSingleTaskSet,执行任务task2 还是不行得话还是按照上面步骤1开始执行。
      1. 假设最后task2的locality降到了ANY,且还是executor2有资源,那么再次调度时以ANY调度该任务,直接分配给executor2。

    也是通过加了一些log感觉明白了不少。自己记性比较差,就算是做一个笔记。如果哪里写错了,还请各位批评指正。

    相关文章

      网友评论

          本文标题:Spark延迟任务调度-实例分析

          本文链接:https://www.haomeiwen.com/subject/axiqpxtx.html