1. 概述
TODO TaskSchedulerImpl和Backend之间的调用路径, backend如何通过心跳的RPC接口来接受Executor传回的各种信息
TaskSchedulerImpl是TaskScheduler接口的一个实现, 用在StandAlone模式下. 当SparkContext启动的时候, 注册到DAGScheduler, SchedulerBackend
![](https://img.haomeiwen.com/i13676247/6bbdbd549b674d31.png)
我们刚才看到了TaskSetManager
一个非常重要method resourceOffer
, 它就是在这里被实际调用的.
我们刚才也看到了Pool
一个非常重要的内部设置 SchedulerModel, 它也是在这里被初始化的, 选择是FIFO 还是FAIR模式
作为具体实现, 它把之前的几级抽象 Task -> TaskSet -> TaskSetManager -> Pool管理起来, 调用它们的方法.作为驱动器一样的存在.
2. 重要的结构
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
* It can also work with a local setup by using a LocalBackend and setting isLocal to true.
* It handles common logic, like determining a scheduling order across jobs, waking up to launch
* speculative tasks, etc.
*
* Clients should first call initialize() and start(), then submit task sets through the
* runTasks method.
*
* THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
* threads, so it needs locks in public API methods to maintain its state. In addition, some
* SchedulerBackends synchronize on themselves when they want to send events here, and then
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
- SPECULATION_INTERVAL_MS
多长时间检查一次比较慢的task
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
- STARVATION_TIMEOUT_MS
进程签不到资源被饿死
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
- 各种映射表, 存储映射关系
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
protected val executorsByHost = new HashMap[String, HashSet[String]]
protected val hostsByRack = new HashMap[String, HashSet[String]]
protected val executorIdToHost = new HashMap[String, String]
// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
val taskIdToExecutorId = new HashMap[Long, String]
- 各种依赖的service
// Listener object to pass upcalls into
var dagScheduler: DAGScheduler = null
var backend: SchedulerBackend = null
val mapOutputTracker = SparkEnv.get.mapOutputTracker
3. 重要的几个内部方法
3.1 Saving SchedulerBackend and Building Schedulable Pools — initialize
Method
内部会选择用那种scheduelr, FIFO或者FAIR
![](https://img.haomeiwen.com/i13676247/0b1fec040b788b8f.png)
3.2 Starting TaskSchedulerImpl — start
Method
启动逻辑, 在内部会启动Backend和各种检查状态的进程, backedn会跟sparkDriver要资源, 继而去找Executor
![](https://img.haomeiwen.com/i13676247/cae26c58d0478e75.png)
3.3 Handling Task Status Update — statusUpdate
Method
当task被标记为FINISH FAILED KILLED LOST 标志时, 这里会找到这个task对应的tasksetmanager, 然后运行TaskSetManager
的内部方法, 把这个task从runningTask列表中去掉, 通知DAGscheduler, 并进行后续的一系列的工作.
如果这个里执行失败, 会看到这样的一个日志.
ERROR Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)
这个方法接收到的信息是通过DriverEndpoint
传递过来的, 也就是Executor直接向Driver发送了RPC请求来告诉Driver这边的Task的执行状态. 关于DriverEndPoint的知识. 我们在Driver-Executor那边有讲解.
3.3 Submitting Tasks for Execution (from TaskSet for Stage) — submitTasks
Method
核心方法之一, 为taskSet初始化对应的TaskSetManager, 并加入到scheduler队列中. 后边Pool会担负起启动这个TaskSet的责任, 而TaskSet内部会把要执行的task交付到DAGScheduler手里附带着序列化好的数据流
![](https://img.haomeiwen.com/i13676247/5c21107dff0d99eb.png)
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
3.4taskSetFinished
Method
![](https://img.haomeiwen.com/i13676247/0f7c45b3a1e6049b.png)
3.5 Creating TaskDescriptions For Available Executor Resource Offers (with CPU Cores) — resourceOffers
Method
![](https://img.haomeiwen.com/i13676247/be06921f2352383b.png)
resourceOffers
会随机的把任务发布到exeecutor上, 而不是用做id的对应, 防止executor-1总是跑TaskSet-1这种尴尬局面, 同时初始化各个Executor上的数据块, 记录可用的CPUS数
![](https://img.haomeiwen.com/i13676247/a4031278acf876c6.png)
这里taskset的前后执行顺序我们前面提到过, 是Pool负责的
![](https://img.haomeiwen.com/i13676247/f3f4f1a2a1134bb9.png)
网友评论