参考的 spark 版本为 2.3.1
.
文章主要先依据作业的执行生命流程介绍实现大致思路, 再到结合源码分析不同模块承担的责任与实现细节。
调度流程
我们知道在 spark 中算子分为两类, Transformation算子与 Action 算子。
我们通过 Transformantion 将我们的原始数据集(RDD)进行一系列的转换,过滤,聚合等的操作转变成我们想要的结果,然后通过 Action来将最终结果集放到合适的位置或者只是简单的展示等。
Action 同时具有触发整个Job的作用,当程序执行到 Action 操作时,整个job才正式开始触发运行。
当一个作业被提交执行后,每当遇到一个 Action, 程序会创建一个 Job 去执行。如下:
val sc = new SparkContext(new SparkConf())
val rdd = sc.textFile(path)
val flatmapRdd = rdd.flatMap(_.split(" "))
val mapRdd = flatmapRdd.map(str =>(str, 1))
val resultRdd = mapRdd.reduceByKey(_+_)
val result = resultRdd.count()
每当调用一次 Transformation 就会产生一个 RDD, Job 会根据这些 Transformation 的依赖关系去构建 RDD 的依赖关系。
然后Job通过RDD的依赖创建 Stage
, ShuffledRDD
是用来划分Stage
的边界。如上只产生了两个Stage
.
Stage
提交是自下而上进行的, 提交Stage
时,如果发现这个Stage
有其他的依赖, 那么就遍历依赖的Stage
并校验有无依赖Stage
,直到找到没有依赖的Stage
去提交执行。
Stage
执行先根据其内部的RDD partition 的数量创建指定的Task
, 并将这些Task
使用TaskSetManager
维护并放到调度池Pool
中。
后面再去资源管理中心SchedulerBackend
去申请资源, 拿到当前剩余 executor 的 core数量后将Pool
中的所有TaskSetManager
按序取出, 将每个TaskSetManager
中的Task放到相应executor的core上去执行,直到资源不足或者是task都已经提交完。
说明 :
-
如果
SchedulerBackend
发现有 Task 执行完,会告知Pool
提交新的Task
进来。 -
如果
TaskSetManager
提交task时, 如果发现所有的task都已经提交完, 则告知上层, 提交其他stage.
调度实现
在实现方面, 整个调度部分主要分为三部分:
-
DAGScheduler
: 主要管理 Job 中 Stage 的分割与执行 -
TaskScheduler
: 主要维护 Task 调度执行信息 -
SchedulerBackend
: 主要负责资源维护, 与作业的执行
DAGScheduler
DAGScheduler
主要负责 Stage
层面的调度任务, 主要工作是将 Job 解析成 Stage
, 并根据依赖关系提交 Stage
去执行。
其内部维护一个 DAGSchedulerEventProcessLoop
的事件处理单线程类, 主要用于响应 DAGScheduler 提交的任务请求。
作业的提交
Action 算子是提交Job的入口,最终将会调用 DAGScheduler.handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties)
比如 RDD.count()
定义为 def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
, 如上所示, 最终所有的 Action 都会转到 SparkContext.runJob
中
def runJob[T, U: ClassTag](
rdd: RDD[T],
// 这个 RDD 为action前一个 transfoarmation 算子产生的RDD
// 是整个 JOB 拓扑中的最后一个RDD
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
最终在DAGScheduler中通过JobSubmitted
事件,将作业提交给 DAGSchedulerEventProcessLoop
去处理
Stage 划分
Job 创建以后首先要解析构建 stage.
private def createResultStage(
rdd: RDD[_], // job 最后一个 RDD
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
DAGScheduler
构建 Stage
的过程是通过递归调用的, 它会检查 RDD 的依赖性, RDD 的 ShuffleDependency 是切分stage的边界。如果被切分的stage 有依赖性,则先创建被依赖的父stage。
stage 的结构如下所示。
private[scheduler] abstract class Stage(
val id: Int, // stage Id
val rdd: RDD[_], // 这个stage 中最后一个rdd
val numTasks: Int,
val parents: List[Stage], // 依赖的父 stage
val firstJobId: Int,
val callSite: CallSite)
extends Logging {
....
}
递归构建过程的入口在getOrCreateParentStages
中实现, 这是一个构建父stage的过程, 它首先检查这个 stage 的RDD的dependencies 有没有 ShuffleDependency
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
getShuffleDependencies
深度优先遍历, 找到这个rdd的所有依赖中的上一层 ShuffleDependency
.
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
如果能找到依赖的RDD,则构建对应RDD的 ShuffleMapStage
, 这个过程又会回到上面,直到没有依赖的stage.
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
...
val parents = getOrCreateParentStages(rdd, jobId)
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
...
stage
}
构建中,除了最后一个RDD构建出的是 ResultStage
, 剩下都是ShuffleMapStage
.
Job 提交后首先触发submitStage的是ResultStage
.
private def submitStage(stage: Stage) {
...
val missing = getMissingParentStages(stage).sortBy(_.id) // 获取这个 stage 依赖的 父 stage
if (missing.isEmpty) {
// 如果没有依赖的父 stage, 则正式提交
submitMissingTasks(stage, jobId.get)
} else {
// 如果存在父 stage , 则尝试提交父stage, 并把自己添加到等待队列里
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
举例 :
这样递归就把没有依赖的 stage 通过 submitMissingTasks
正式提交出去。
stage 的提交
stage 提交执行之前, 首先需要将 stage 转换成 task。因为在 spark 中,task 才是任务计算的单元, task 中封装这个 stage 的执行过程, 每一个 partition 对应一个task. 根据stage 类型的不同, task 可以分成 ShuffleMapTask
与 ResultTask
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
// 根据 partition 计算 task 的主机偏好
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
。。。
return
}
....
// 根据 partition 创建 task
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
...
}
if (tasks.size > 0) {
// task 提交给 taskScheduler 去处理
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// 提交依赖 stage 的子 stage 集合
markStageAsFinished(stage, None)
submitWaitingChildStages(stage)
}
}
TaskScheduler
TaskScheduler
主要负责Task
的提交,管理, 以及 executor 的信息维护等。在 Spark 2.3.1 里面, 实现 TaskScheduler
的是 TaskSchedulerImpl
.
TaskSchedulerImpl
调度 Task
主要依靠作业调度池 Pool
.
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
...
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
Pool
Pool 是一个Task
调度池, 它决定了需要调度的Task
的资源获取顺序。它的调度单元是TaskSetManager
.
getSortedTaskSetQueue
方法用于按优先级对TaskSetManager
进行排序。只有前面的TaskSetManager
完成了资源获取, 后面的作业才有机会进行调度。
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
图片
TaskSetManager
主要用于维护一个 stage 中的所有 Task
,并根据提供的机器信息决定提交哪个task上去。
初始化时, 将所有的task放到等待提交task集合里
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
...
private[spark] def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // No point scanning this whole list to find the old task there
}
Pool 的构建依赖于 SchedulableBuild
.
-
FIFOSchedulableBuilder
:
FIFOSchedulableBuilder
构建的调度池调度策略较为单一, 不同TaskSetManger
间使用priority
作为优先级顺序,如果priority
相同,则使用stageId
作为优先级顺序。
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
}
-
FairSchedulableBuilder
:
FairSchedulerBuilder
构造的调度池嵌套多个子调度池, 每个调度池的具有自己的权重信息,高优先级的调度池内的TaskSetManager 要优先于低优先级的 TaskSetManager. 同一个调度池内的TaskSetManager
使用 priority
与 stageId
比较。
当 task 通过 DAGScheduler
提交过来时,TaskScheduler 首先会把他封装成 TaskSetManager
, 放到调度池Pool中。然后去 SchedulerBackend
申请资源。
val manager = createTaskSetManager(taskSet, maxTaskFailures)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
backend.reviveOffers()
SchedulerBackend 如果有资源, 会把资源转交给 TaskScheduler 让它去安排这些资源上要执行哪些作业def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]]
。
每个 WorkerOffer
中记录所在的executor以及这上面有多少空闲的 core
.
接下来 TaskScheduler 首先对这些 WorkOffer 进行过滤, 排除掉一些黑名单上的 executor, 然后对这些 executor 混洗,使得作业安排更均匀一些。
然后从调度池中按序取出所有的 TaskSetManager.
接下来需要尝试按序为每个 TaskSetManager 中的所有 Task 分配资源。最终直到没有空闲资源或者所有的Task分配完成。
说明
-
资源分配是以 TaskSetManager 为序的, 当前面的 TaskSetManager 分配完所有的Task,后面的TaskSetManager才有机会分配。
-
Task 的分配是以作业本地性的优先级依次来进行分配的, 尽量将数据与计算本地化
-
尽量将Task均匀的分散到所有的executor上。
/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*
* 这个方法由调度后端调用,调度后端会将可用的 executor 资源告诉TaskSchedulerImpl,
* TaskSchedulerImpl 根据 TaskSet 优先级(调度池),黑名单,本地性等因素给出要实际运行的任务。
* 我们使用 round-robin 的方式将任务分配到各个executor上,以使得计算资源的 使用更均衡。
*/
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// 标记是否有新的可用executor加入
var newExecAvail = false
// 这个循环主要目的是两个:
// 1. 更新一些簿记量,如物理节点和executor的相互映射关系,机架和host的映射关系,host和executor上运行的任务信息等等
// 2. 检查是否有新的可用executor加入
for (o <- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
// 触发黑名单的超时检查,被加入黑名单的节点或executor是有一定超时时间的,
// 在超时时间内不能像他们提交任务,而过了超时时间,这些资源将被重新投入使用.
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
// 根据最新的黑名单过滤掉在黑名单中的计算资源,包括host和executor
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)
// 对资源进行混洗,使得分配更加均匀,使用scala库的Random进行混洗
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// 每个executor能分配多少个任务,cores / CPUS_PER_TASK
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
// 每个executor提供的cpu核数
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 通过调度池对所有的任务集按优先级进行排序,获取排序后的任务集
val sortedTaskSets = rootPool.getSortedTaskSetQueue // 第二部
// 如果有新的executor加入,需要通知每个TaskSetManager
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// 按照我们的调度顺序获取每个TaskSet,然后按局部性级别的升序将其提供给每个节点,以便它有机会在所有节点上启动本地任务。
// NOTE: 分配顺序 : PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
// 本地性从低到高的顺序
for (currentMaxLocality <- taskSet.myLocalityLevels) {
// 每个本地性级别会进行多轮分配,
// 每一轮依次轮询每个executor,每个executor分配一个任务,
// 这样一轮下来每个executor都会分配到一个任务,显然大多数情况下,executor的资源是不会被占满的
// 没关系,我们会接着进行第二轮分配,知道没有资源或者在当前的本地性级别下任务被分配完了,就跳出循环
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
当所有的 task 分配好资源后, SchedulerBackend 将拿到分配好资源的作业信息 TaskDescription
去提交执行。当有Task执行完成后会告知 TaskScheduler 调度新的Task进来。并且需要检查是否要调度新的 stage 进来。
网友评论