美文网首页
Spark 任务本地性

Spark 任务本地性

作者: 天之見證 | 来源:发表于2019-08-13 22:05 被阅读0次
  1. 任务先要有资源
  2. TaskScheduler 根据1中拿到的资源去调度任务
  3. 任务的本地性和hdfs文件的本地性有关联吗
  4. 任务的本地性和hdfs文件的本地性是怎么关联起来的

1. 任务调度的入口

任务之所以能调度起来是因为有executor的资源分配到了, 具体到代码里有以下几种情况 (CoarseGrainedSchedulerBackend 收到以下event):

event source method
RegisterExecutor CoarseGrainedExecutorBackend makeOffers
ReviveOffers TaskScheduler (提交任务时, executor丢失或宕掉时, task失败时, 可启用推测执行时), DriverEndpoint makeOffers
StatusUpdate ExecutorBackend (任务完成时) makeOffers(executorId)

通过以上event更新资源状态, 然后TaskScheduler 根据新拿到的资源去调度任务

launchTasks(scheduler.resourceOffers(workOffers))

具体resourceOffers的实现有以下几块:

  1. 更新4个数据结构的内容: hostToExecutors, executorIdToHost, executorIdToRunningTaskIds, hostsByRack
  2. 如果有新的executor 则更新每个TaskSetManager的本地性
  3. 根据本地性获取可以启动的task

2. TaskSetManager 所谓的本地性是怎么实现的

TaskSetManager 的本地性是由4个变量决定的 (其中ArrayBuffer[Int] 存储的是taskIds, key则为对应的executor/host/rack)

name type 变量什么时候更新
pendingTasksForExecutor HashMap[String, ArrayBuffer[Int]] addPendingTask
pendingTasksForHost HashMap[String, ArrayBuffer[Int]] addPendingTask
pendingTasksWithNoPrefs ArrayBuffer[Int] addPendingTask
pendingTasksForRack HashMap[String, ArrayBuffer[Int]] addPendingTask

以上4个变量的更新都在addPendingTask 中实现的:

/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
  for (loc <- tasks(index).preferredLocations) {
    loc match {
      case e: ExecutorCacheTaskLocation =>
        pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
      case e: HDFSCacheTaskLocation =>
        val exe = sched.getExecutorsAliveOnHost(loc.host)
        exe match {
          case Some(set) =>
            for (e <- set) {
              pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
            }
            logInfo(s"Pending task $index has a cached location at ${e.host} " +
              ", where there are executors " + set.mkString(","))
          case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
              ", but there are no executors alive there.")
        }
      case _ =>
    }
    pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
    for (rack <- sched.getRackForHost(loc.host)) {
      pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
    }
  }
  
  if (tasks(index).preferredLocations == Nil) {
    pendingTasksWithNoPrefs += index
  }
  
  allPendingTasks += index  // No point scanning this whole list to find the old task there
}

从上我们可以看出这些变量的更新依赖于Task#preferredLocations, 则需要关注下该方法的实现, 涉及到以下2点:

  1. TaskLocation
  2. preferredLocations的实现

以上4个变量可以用来计算TaskSetManager的本地性:

private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
  import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
  val levels = new ArrayBuffer[TaskLocality.TaskLocality]
  if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
      pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
    levels += PROCESS_LOCAL
  }
  if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
      pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
    levels += NODE_LOCAL
  }
  if (!pendingTasksWithNoPrefs.isEmpty) {
    levels += NO_PREF
  }
  if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
      pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
    levels += RACK_LOCAL
  }
  levels += ANY
  logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
  levels.toArray
}

2.1 TaskLocation

task的本地性使用executorID和host来表示, 并且executorID优于host, 具体类型:

类型 desc
ExecutorCacheTaskLocation(host, executorId)
HostTaskLocation(host)
HDFSCacheTaskLocation(host) 被hdfs所缓存

2.2 preferredLocations

该方法对于不同的task类型有不同的实现

val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  stage match {
    case s: ShuffleMapStage =>
      partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
      partitionsToCompute.map { id =>
        val p = s.partitions(id)
        (id, getPreferredLocs(stage.rdd, p))
      }.toMap
  }
}
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
  getPreferredLocsInternal(rdd, partition, new HashSet)
}

pivate def getPreferredLocsInternal(
     rdd: RDD[_],
     partition: Int,
     visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
   // If the partition has already been visited, no need to re-visit.
   // This avoids exponential path exploration.  SPARK-695
   if (!visited.add((rdd, partition))) {
     // Nil has already been returned for previously visited partitions.
     return Nil
   }
   
   // If the partition is cached, return the cache locations
   val cached = getCacheLocs(rdd)(partition)
   if (cached.nonEmpty) {
     return cached
   }
   
   // If the RDD has some placement preferences (as is the case for input RDDs), get those
   val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
   if (rddPrefs.nonEmpty) {
     return rddPrefs.map(TaskLocation(_))
   }

   // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
   // that has any placement preferences. Ideally we would choose based on transfer sizes,
   // but this will do for now.
   rdd.dependencies.foreach {
     case n: NarrowDependency[_] =>
       for (inPart <- n.getParents(partition)) {
         val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
         if (locs != Nil) {
           return locs
         }
       }
     case _ =>
  }

  Nil
}

以上代码分了3中情况来探讨该怎么获取最优的task的本地性:

  1. 该partition的数据是否已被缓存
  2. 该partition的数据是否被checkpoint或者是否有较优的本地性
  3. 从窄依赖追溯到父分区递归调用获取较优的本地性

从上可以看出较多的时候是调用RDD#getPreferredLocations来获取较优的本地性, 并且该方法的实现因RDD 的不同而有不同的实现,今以NewHadoopRDD的实现来看

override def getPreferredLocations(hsplit: Partition): Seq[String] = {
  val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
  val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
    case Some(c) =>
      try {
        val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
        HadoopRDD.convertSplitLocationInfo(infos)
      } catch {
        case e : Exception =>
          logDebug("Failed to use InputSplit#getLocationInfo.", e)
          None
      }
    case None => None
  }
  locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}

private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
  Option(infos).map(_.flatMap { loc =>
    val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
    val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
    if (locationStr != "localhost") {
      if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
        logDebug(s"Partition $locationStr is cached by Hadoop.")
        Some(HDFSCacheTaskLocation(locationStr).toString)
      } else {
        Some(HostTaskLocation(locationStr).toString)
      }
    } else {
      None
    }
  })
}

最终从这里我们看到了本地性和hdfs文件的本地性是怎么关联起来的了

3. 获取待调度的task

  1. resourceOfferSingleTaskSet
  2. taskSet.resourceOffer
  3. dequeueTask (这步考虑到了本地性)
for (taskSet <- sortedTaskSets) {
  var launchedAnyTask = false
  var launchedTaskAtCurrentMaxLocality = false
  for (currentMaxLocality <- taskSet.myLocalityLevels) {
    do {
      launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
        taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
      launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    } while (launchedTaskAtCurrentMaxLocality)
  }
  if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
  }
}

private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
  var launchedTask = false
  for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    if (availableCpus(i) >= CPUS_PER_TASK) {
      try {
        for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
          tasks(i) += task
          val tid = task.taskId
          taskIdToTaskSetManager(tid) = taskSet
          taskIdToExecutorId(tid) = execId
          executorIdToRunningTaskIds(execId).add(tid)
          availableCpus(i) -= CPUS_PER_TASK
          assert(availableCpus(i) >= 0)
          launchedTask = true
        }
      }
      // ...
    }
  }
  return launchedTask
}

def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
      blacklist.isNodeBlacklistedForTaskSet(host) ||
        blacklist.isExecutorBlacklistedForTaskSet(execId)
    }
    if (!isZombie && !offerBlacklisted) {
      val curTime = clock.getTimeMillis()
      var allowedLocality = maxLocality

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
        
        // ...
        
        addRunningTask(taskId)

        // ...
      }
    } else {
      None
    }
  }

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {  for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
    return Some((index, TaskLocality.PROCESS_LOCAL, false))
  }
  if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
    for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
      return Some((index, TaskLocality.NODE_LOCAL, false))
    }
  }
  if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
    // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
    for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }
  }
  if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
    for {
      rack <- sched.getRackForHost(host)
      index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
    } {
      return Some((index, TaskLocality.RACK_LOCAL, false))
    }
  }
  if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
    for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
      return Some((index, TaskLocality.ANY, false))
    }
  }
  // find a speculative task if all others tasks have been scheduled
  dequeueSpeculativeTask(execId, host, maxLocality).map {
    case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}

相关文章

网友评论

      本文标题:Spark 任务本地性

      本文链接:https://www.haomeiwen.com/subject/rayijctx.html