美文网首页
Spark Scheduler

Spark Scheduler

作者: 踏雪寻梅4149 | 来源:发表于2017-09-06 20:30 被阅读0次

Spark Job Scheduling

老的分享。Spark JobScheduling在官方文档中讲的不是特别清楚,所以特意做了实验并阅读源码了解了自己疑惑的几个地方,特记录在此。由于之前的公司是外企,所以用的是英文,请读者见谅我蹩脚的英文。

The official document link : http://spark.apache.org/docs/latest/job-scheduling.html

  • There are two different schedulers in current spark implementation, FIFO is the default setting and the initial way that spark implement.

  • Both FIFO and FAIR schedulers can support the basic functionality that multiple parallel jobs run simultaneously, the prerequisite is that they are submitted from separate threads. (i.e., in single thread, the jobs are executed in order)

  • In FIFO Scheduler, the jobs which are submitted earlier has higher priority and possibility than those later jobs. But it doesn't mean that the first job will execute first, it is also possible that later jobs run before the earlier ones if the resources of the whole cluster are not occupied. However, the FIFO scheduler will cause the worst case: if the first jobs are large, the later jobs maybe suffer significant delay.

  • The FAIR Scheduler is the way corresponding to Hadoop FAIR scheduler and enhancement for FIFO. In FIFO fashion, there is only one factor Priority will be considered in SchedulableQueue; While in FAIR fashion, more factors will be considered including minshare, runningtasks, weight (You can reference the code below if interest).Similarly, the jobs don't always run by following the rules by FairSchedulingAlgorithm strictly, while as a whole, the FAIR scheduler really alleviate largely the delay time for small jobs by adjusting the parameters which were delayed significantly in FIFO fashion in my observation through the concurrent JMeter tests for (项目名隐去、见谅).

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }

The pools in FIFO and FAIR schedulers

Picture1.png

To avoid interference, I replace TaskSetMagger with Job to help understanding in schedulableQuene.

相关文章

网友评论

      本文标题:Spark Scheduler

      本文链接:https://www.haomeiwen.com/subject/lrivjxtx.html