1. 概述
从上文中可以看到, Driver通过DAGScheduler和RDD依赖图谱关系, 一阵推算, 把JOB变成了多个Stage的执行蓝图后. 这些Stage要通过TaskScheduler来实际执行. 把执行过程的task分给维护着Parition信息的Executor分布式干活, 并记录和追踪执行结果
这里的TaskScheduler按照我们一直贯彻的外包公司模型, 可以理解成产品经理终于搞清楚设计不再改了,于是他开始在JIRA里建立Sprint, 在Sprint里建立task给大家分活.

TaskScheduler利用心跳接收器那边的信息executorHeartBeatReceived
来知道现在有多少搬砖工在工位上, 它会注册executorLost
executorAdd
等事件到liveListener上监听哪些哥们新来了, 哪些哥们旷工了.
和现实中一样, 产品经理也分好多种实现
- TaskSchedulerImpl 默认的
TaskScheduler
- YarnScheduler 适用于
yarn-client deploy model
. 这种模式下Driver跑在提交任务的client上 - YarnClusterScheduler 适用于
yarn-cluster deploy model
. 这种模式下Driver跑在Yarn集群上
2. 接口代码
/**
* Low-level task scheduler interface, currently implemented exclusively by
* [[org.apache.spark.scheduler.TaskSchedulerImpl]].
* This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
* for a single SparkContext. These schedulers get sets of tasks submitted to them from the
* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
* them, retrying if there are failures, and mitigating stragglers. They return events to the
* DAGScheduler.
*/
private[spark] trait TaskScheduler {
private val appId = "spark-application-" + System.currentTimeMillis
def rootPool: Pool
def schedulingMode: SchedulingMode
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).
// Yarn uses this to bootstrap allocation of resources based on preferred locations,
// wait for slave registrations, etc.
def postStartHook() { }
// Disconnect from the cluster.
def stop(): Unit
// Submit a sequence of tasks to run.
def submitTasks(taskSet: TaskSet): Unit
// Cancel a stage.
def cancelTasks(stageId: Int, interruptThread: Boolean)
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
def defaultParallelism(): Int
/**
* Update metrics for in-progress tasks and let the master know that the BlockManager is still
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean
/**
* Get an application ID associated with the job.
*
* @return An application ID
*/
def applicationId(): String = appId
/**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
/**
* Get an application's attempt ID associated with the job.
*
* @return An application's Attempt ID
*/
def applicationAttemptId(): Option[String]
}
3. 生命周期

4. 维护的结构
TaskScheduler中主要为了了一种叫org.apache.spark.scheduler.Schedulable
的结构, 这是一个interface.
可以看到它有两种实现 一个叫Pool, 一个叫TaskSetManager.
/**
* An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
var parent: Pool
// child queues
def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def runningTasks: Int
def priority: Int
def stageId: Int
def name: String
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def checkSpeculatableTasks(): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
在使用上, 实际上Pool相当于一个树形结构的非叶子节点, 而TaskSetManager则是这个树上的叶子节点.
在TaskScheduler初始化的时候, 需要制定使用哪种策略.
后面我们会大概讲解TaskSetManager和Pool的一些结构
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
网友评论