美文网首页
5.5 TaskScheduler 之Pool

5.5 TaskScheduler 之Pool

作者: GongMeng | 来源:发表于2018-11-21 14:51 被阅读0次

1. 概述

相当于一个多叉树的非叶子节点, 在内部用ConcurrentLinkedQueue维护多个child node.
进一步实现一种自下而上的对整个树的调度和管理.
前面介绍过, Pool是对Trait Scheduler的另外一种实现.
Pool会根据FIFO原则或者FAIR原则来管理它维护的所有child的先后执行顺序, 相对于TaskSetManager关注一个TaskSet内部的Task的运行状态和顺序, Pool关注的是TaskSet之间的向后运行顺序.

2. 重要内部结构

   // 这个两个结构联合起来管理这个Pool里所有的child node 
   // 可以是pool或者tasksetmanager
  val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
  val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]

  // 
  var weight = initWeight
  var minShare = initMinShare
 
 /* 前面我们看到过, 当这个Pool下面的叶子节点里有task在运行
这里就会+1, 它反映的是这个Pool管理的所有的TaskSet共有多少个task在运行*/
  var runningTasks = 0

  // 优先级用于FAIR调度
  var priority = 0

  // A pool's stage id is used to break the tie in scheduling.
  var stageId = -1
  var name = poolName

3. 重要的内部方法

几个简单的override就不介绍了, 就是对维护的各种tasksetmnager进行状态标注和计数
有一个非常重要的是

  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

这个方法实现了对TaskSet的排序, 决定了哪个TaskSet先运行, 哪个后运行. 可以看到具体的实现依赖taskSetSchedulingAlgorithm

  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

3.1 FIFO的实现

非常简单, 先进先出嘛, 默认的Queue就有这个功能.
具体实现的时候, 就是先看谁的priority高, 高的先运行.
如果priority一样, 就看似会的stageId大, 大的后运行, 小的先运行.

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

3.2 FAIR的实现

FAIR
这个就复杂了, FAIR是根据minShare runningTasks weight来决定哪个TaskSet先运行.

相关文章

网友评论

      本文标题:5.5 TaskScheduler 之Pool

      本文链接:https://www.haomeiwen.com/subject/zlkcqqtx.html