- 任务先要有资源
-
TaskScheduler
根据1中拿到的资源去调度任务 - 任务的本地性和hdfs文件的本地性有关联吗
- 任务的本地性和hdfs文件的本地性是怎么关联起来的
1. 任务调度的入口
任务之所以能调度起来是因为有executor
的资源分配到了, 具体到代码里有以下几种情况 (CoarseGrainedSchedulerBackend
收到以下event
):
event | source | method |
---|---|---|
RegisterExecutor |
CoarseGrainedExecutorBackend |
makeOffers |
ReviveOffers |
TaskScheduler (提交任务时, executor丢失或宕掉时, task失败时, 可启用推测执行时), DriverEndpoint
|
makeOffers |
StatusUpdate |
ExecutorBackend (任务完成时) |
makeOffers(executorId) |
通过以上event
更新资源状态, 然后TaskScheduler
根据新拿到的资源去调度任务
launchTasks(scheduler.resourceOffers(workOffers))
具体resourceOffers
的实现有以下几块:
- 更新4个数据结构的内容:
hostToExecutors
,executorIdToHost
,executorIdToRunningTaskIds
,hostsByRack
- 如果有新的
executor
则更新每个TaskSetManager
的本地性 - 根据本地性获取可以启动的task
2. TaskSetManager
所谓的本地性是怎么实现的
TaskSetManager
的本地性是由4个变量决定的 (其中ArrayBuffer[Int]
存储的是taskIds, key则为对应的executor/host/rack)
name | type | 变量什么时候更新 |
---|---|---|
pendingTasksForExecutor |
HashMap[String, ArrayBuffer[Int]] |
addPendingTask |
pendingTasksForHost |
HashMap[String, ArrayBuffer[Int]] |
addPendingTask |
pendingTasksWithNoPrefs |
ArrayBuffer[Int] |
addPendingTask |
pendingTasksForRack |
HashMap[String, ArrayBuffer[Int]] |
addPendingTask |
以上4个变量的更新都在addPendingTask
中实现的:
/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // No point scanning this whole list to find the old task there
}
从上我们可以看出这些变量的更新依赖于Task#preferredLocations
, 则需要关注下该方法的实现, 涉及到以下2点:
TaskLocation
-
preferredLocations
的实现
以上4个变量可以用来计算TaskSetManager
的本地性:
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}
2.1 TaskLocation
task的本地性使用executorID和host来表示, 并且executorID优于host, 具体类型:
类型 | desc |
---|---|
ExecutorCacheTaskLocation(host, executorId) |
|
HostTaskLocation(host) |
|
HDFSCacheTaskLocation(host) |
被hdfs所缓存 |
2.2 preferredLocations
该方法对于不同的task类型有不同的实现
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
}
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
getPreferredLocsInternal(rdd, partition, new HashSet)
}
pivate def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
以上代码分了3中情况来探讨该怎么获取最优的task的本地性:
- 该partition的数据是否已被缓存
- 该partition的数据是否被checkpoint或者是否有较优的本地性
- 从窄依赖追溯到父分区递归调用获取较优的本地性
从上可以看出较多的时候是调用RDD#getPreferredLocations
来获取较优的本地性, 并且该方法的实现因RDD
的不同而有不同的实现,今以NewHadoopRDD
的实现来看
override def getPreferredLocations(hsplit: Partition): Seq[String] = {
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
try {
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e : Exception =>
logDebug("Failed to use InputSplit#getLocationInfo.", e)
None
}
case None => None
}
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
Option(infos).map(_.flatMap { loc =>
val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
logDebug(s"Partition $locationStr is cached by Hadoop.")
Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
Some(HostTaskLocation(locationStr).toString)
}
} else {
None
}
})
}
最终从这里我们看到了本地性和hdfs文件的本地性是怎么关联起来的了
3. 获取待调度的task
resourceOfferSingleTaskSet
taskSet.resourceOffer
-
dequeueTask
(这步考虑到了本地性)
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
// ...
}
}
return launchedTask
}
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// ...
addRunningTask(taskId)
// ...
}
} else {
None
}
}
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{ for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}
// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}
网友评论