美文网首页
Spark延迟任务调度-实例分析

Spark延迟任务调度-实例分析

作者: sf705 | 来源:发表于2018-01-25 17:07 被阅读217次

最近在看Spark任务调度,尤其是延迟调度这块,翻了好多资料与博客都是发现延迟调度中是这样介绍的:
“在为任务分配节点时(executor),先判断任务的最佳运行节点是否空闲,也就是本任务的数据本地性最高的节点。如果空闲直接分配任务,如果最佳节点没有足够资源则等待一定时间。在等待时间内得到资源,则任务在该节点运行,否则找出次佳节点运行。。。。”
后看根据源码还是看的不太懂,为此找到一篇博客的作者大哥BIGUFO ,很热心和我讲解了许多,收益很多,当初自己还是理解的有些窄。
先看一下我的理解,以TaskSchedulerImpl中resourceOffers开始

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    ......
    // 获取可用的executor列表并加入不同的集合中
    .....

    val shuffledOffers = shuffleOffers(filteredOffers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueue  // 对获得的TaskSet排序
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }
    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    for (taskSet <- sortedTaskSets) {  // 取出每一个任务集
      var launchedAnyTask = false
      var launchedTaskAtCurrentMaxLocality = false
      for (currentMaxLocality <- taskSet.myLocalityLevels) {  // 按照任务集的locality
        do { // 对单个任务集调度
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }
      if (!launchedAnyTask) {
        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
      }
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

可以看出主要策略如下:

  • 1.随机打乱executor,避免分配到同一个work中
  • 2.对TaskSet排序
  • 3.为TaskSet分配资源,按照TaskSet最大的locality开始分配
  • 4.取第一个TaskSet,再取当前TaskSet的最大的locality,调用resourceOfferSingleTaskSet

在此假设有三个work节点都为CPU为1,也就是只能并发执行一个任务,每个work节点只有一个executor,而当前TaskSet就四个任务,如下图:


image.png

task1和task2数据在executor1中,所以task1和task2的当前locality为process_local,task3和task4数据在work2和work3中,所以对应不同的work都为node_local

现在任务开始调度,当前TaskSet的currentMaxLocality为process_local,执行函数 resourceOfferSingleTaskSet(TaskSet, PROCESS_LOCAL, ....) 对单个任务集调度
该函数代码如下TaskSchedulerImpl#resourceOfferSingleTaskSet:

  private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    // nodes and executors that are blacklisted for the entire application have already been
    // filtered out by this point
    for (i <- 0 until shuffledOffers.size) {  //对传递过来的executor遍历
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      if (availableCpus(i) >= CPUS_PER_TASK) { //判断传递过来的executor是否满足
        try { //如果满足,就选择当前的executor,和指定的locality来执行任务。
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }

可以看出对在给定executor集合和locality情况下,对单个任务集调度过程如下:

  • 1.一个for循环遍历executor集合,假设我们选择上图中的executor1, 开始判断CPU核数是否满足,一看满足,便开始执行resourceOffer
  • 2.执行resourceOffer(executor1,work1,PROCESS_LOCAL), (这个locality是从TaskSchedulerImpl#resouceOffers传递过来的)
    其实这一步就是给定executor和locality,寻找最合适的任务,进入这个函数看一下:
 def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
   ......
    if (!isZombie && !offerBlacklisted) {
      val curTime = clock.getTimeMillis()

      var allowedLocality = maxLocality  //记录本地性 process_local

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)   //寻找当前允许的本地性
        if (allowedLocality > maxLocality) {  
// 如果允许的本地性低,还是用原来的本地性
// 假设getAllowedLocalityLevel返回的是NODE_LOCAL, 比原来PROCESS低,还是用PROCESS
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
       ......

可以看出,resourceOffer就是给定了executor和locality,来找合适的任务,我们给定的是executor1 和 PRCESS_LOCAL,开始执行getAllowedLocalityLevel,该函数就是返回允许执行的locality,进入该函数

private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
   ......
    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
        currentLocalityIndex += 1
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

因为我们四个任务中,task1和task2相对于executor1是PROCESS,也就是PROCESS级别的有两个任务,所以会执行这句 case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) 然后返回本地性还是PROCESS_LOCAL, 然后就在resourceOffer中调用dequeueTask(executor1, work1, PROCESS_LOCAL) ,该函数如下:

 private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }
    // TaskLocality <= maxLocality,  TaskLocality level is lower   by fyz
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
        return Some((index, TaskLocality.ANY, false))
      }
    }

    // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  }

在执行第一个for循环,发现有task1和task2满足,假设选择task1最为执行任务,那么直接return,task1在executor1执行了。
注意dequeueTask成功一个后,此时返回到哪里了,返回的是TaskSchedulerImpl#resourceOfferSingleTaskSet,因为这仅仅是对executor1分配了任务,里面有个for循环呢,是对所有executor遍历,然后寻找合适的任务。
返回后选择第二个executor,假设是executor2,此时的locality还是PROCESS_LOCAL(只有返回到resourceOffers中,locality才会改变),此时流程如下:

    1. 给定了executor2和 PROCESS_LOCAL, 调用resourceOffer
    1. 调用getAllowedLocalityLevel,因为task2相对executor1还是PROCESS,所以返回的还是PROCESS。此时假设等待时间超过默认的3s,则将该任务task2降一级。
    1. 调用dequeueTask(executor2,work2,PROCESS_LOCAL),执行第一个for,发现对executor2,没有PROCESS的任务,返回None,因为后面都有一个if判断过不去TaskLocality.isAllowed(),PROCESS是最高的
    1. 然后再次换一个executor,选择executor3,,同样上面前三步的操作

注意,在第二步中,如果超时,在getAllowedLocalityLevel中,将任务的locality从PROCESS降为NODE_LOCAL,返回到resourceOffer,后面的if还会再次修改成PROCESS。但是有人认为,那干嘛还要降一级,反正最后还有修改过来,岂不是浪费感情?
实际上,如果当前任务的locality从PROCESS降为NODE_LOCAL,虽然这次任务是以PORCESSS级别执行的,那么下次返回到TaskSchedulerImpl#resourceOffers中按照locality 的for循环(也就是NODE_LOCAL)后,再次执行任务,发现没有PROCESS的任务了,就好执行NODE_LOCAL的任务了,如果不降级,无论轮到TaskSet的哪个级别,总是在getAllowedLocalityLevel判断有一个PROCESS任务,又修改为PROCEES级别的任务了。
可以发现,三个executor遍历完,就执行一个task1,然后再次返回,这次返回到的是TaskSchedulerImpl#resourceOffers中两个for循环中的do-while 循环,因为resourceOfferSingleTaskSet返回true,会一直执行do-while,除非满足一下条件中的一条,才结束do-while

    1. executor的CPU都不满足,这样就改不了resourceOfferSingleTaskSet的launchedTask=false的值
    1. 没有执行的任务了,这样就不能执行resourceOfferSingleTaskSet中的for了,也是无法修改launchedTask=false的值了。

任务超时情况

在第一次调用TaskSchedulerImpl#resourceOffers执行了任务task1,task3,task4,由于三个worker都是是一核心,那么每个work一次只能执行一个任务,所以还剩下一个task2 是对work1的PROCESS没有被执行,此时所有work都是忙碌状态,那么此时执行完毕resourceOffers,没有资源了。因为任务分配到资源后,会将任务提交到CoarseGrainedSchedulerBackend的launchTasks方法中,在该方法中会将任务发送到worker节点的Coarse...Backend执行。有人问难道没有资源了,剩下的一个task2就不执行了吗?
其实调度池中是有一个pool来维护所有的TaskSet的,这次没有完成,下次再次执行该TaskSet,然后还是resourceOffers,如果还是没有资源,那么就在到pool了。那么来分析一下有资源的时,再次执行TaskSet的情况。
假设下一次执行TaskSet,有空闲资源了,发现还时task2没有执行,且是对executor1的PROCESS的任务。此时发现空闲资源为worker2上的executor2,此时任务开始执行。

    1. 调用resourceOffers,然后以PROCESS开始调用TaskSchedulerImpl#resourceOfferSingleTaskSet,发现 executor2可用
    1. 然后调用TaskSetManager#resourceOffer,此时有两种,一是当前任务task2离上次任务的执行时间间隔小于3s,二是超过3s
  • 2.1. 如果当前小于3s ,那么就在dequeueTask时发现没有对task2的PROCESS的executor,那么返回到TaskSchedulerImpl#resourceOffers中了,继续按照locality再次执行
  • 2.2. 如果当前超过3s, 那么在TaskSetManager#getAllowedLocalityLevel中将task2的locality降为NODE_LOCAL ,但是比PROCESS级别低,在dequeueTask执行时,传入的allowedLocality还是为PROCESS,本地还是以PROCESS调度task2。
    1. 在2.2中还是调度不成功,返回到resourceOffers中,此时将PROCESS将为NODE_LOCAL执行,然后再次提交给resourceOfferSingleTaskSet,执行任务task2 还是不行得话还是按照上面步骤1开始执行。
    1. 假设最后task2的locality降到了ANY,且还是executor2有资源,那么再次调度时以ANY调度该任务,直接分配给executor2。

也是通过加了一些log感觉明白了不少。自己记性比较差,就算是做一个笔记。如果哪里写错了,还请各位批评指正。

相关文章

网友评论

      本文标题:Spark延迟任务调度-实例分析

      本文链接:https://www.haomeiwen.com/subject/axiqpxtx.html