美文网首页
TaskScheduler

TaskScheduler

作者: Sunnywade | 来源:发表于2018-02-20 22:30 被阅读0次

    Task: 在前面的章节中提到,一个Job由DAGScheduler划分后通常包含多个Stage,而一个Stage又是由多个Task组成, Task分为ShuffleMapTask和ResultTask,一个JOB的DAG中最后面的Stage包含多个ResultTask,而前面的 Stage都包含的是ShuffleMapStage,在提交Stage时,会将Stage的Task经过序列化后广播至各Executor,由Executor反序列化后执行。

    ShuffleMapTask负责执行任务,并根据Partitioner将结果划分成多个输出,供后续的Stage读取;

    ResultTask负责执行任务,并将结果返回给Driver。

    通常,一个Task负责处理RDD的一个partition,在Task类中定义了一个重要在Executor中执行的方法,该方法构造一个TaskContextImpl对象,并调用runTask(context)执行具体任务。

    ShuffleMapTask和ResultTask都提供了对runTask的具体实现:

    ShuffleMapTask

      override def runTask(context: TaskContext): MapStatus = {
        // Deserialize the RDD using the broadcast variable.
        val threadMXBean = ManagementFactory.getThreadMXBean
        val deserializeStartTime = System.currentTimeMillis()
        val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
        _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
        } else 0L
    
        var writer: ShuffleWriter[Any, Any] = null
        try {
          val manager = SparkEnv.get.shuffleManager
          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
          writer.stop(success = true).get
        } catch {
          case e: Exception =>
            try {
              if (writer != null) {
                writer.stop(success = false)
              }
            } catch {
              case e: Exception =>
                log.debug("Could not stop writer", e)
            }
            throw e
        }
      }
    

    该方法的核心内容就是将Task进行反序列化,获取到反序列化后的RDD和ShuffleDependency,然后调用RDD的iterator方法处理该Task对应的Partition,并利用ShuffleWriter将结果分片输出。

    ResultTask

      override def runTask(context: TaskContext): U = {
        // Deserialize the RDD and the func using the broadcast variables.
        val threadMXBean = ManagementFactory.getThreadMXBean
        val deserializeStartTime = System.currentTimeMillis()
        val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
        _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
        } else 0L
    
        func(context, rdd.iterator(partition, context))
      }
    

    ResultTask的runTask方法更简单,反序列化RDD和结果处理方法func,然后执行func方法将结果输出。

    Submit tasks

    DAGScheduler的submitMissingTasks方法在提交尚未执行的任务时,首先对task序列化并广播,最终调用TaskScheduler的submitTasks将任务提交至后端。TaskScheduler负责与Backed进行交互,提供对任务的资源分配、状态更新、容错处理等。

    首先看看TaskScheduler的submitTasks方法,TaskScheduler在Spark中只有一个具体的实现,即TaskSchedulerImpl,

    override def submitTasks(taskSet: TaskSet) {
        val tasks = taskSet.tasks
        logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
        this.synchronized {
          val manager = createTaskSetManager(taskSet, maxTaskFailures)
          val stage = taskSet.stageId
          val stageTaskSets =
            taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
          stageTaskSets(taskSet.stageAttemptId) = manager
          val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
            ts.taskSet != taskSet && !ts.isZombie
          }
          if (conflictingTaskSet) {
            throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
              s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
          }
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    
          if (!isLocal && !hasReceivedTask) {
            starvationTimer.scheduleAtFixedRate(new TimerTask() {
              override def run() {
                if (!hasLaunchedTask) {
                  logWarning("Initial job has not accepted any resources; " +
                    "check your cluster UI to ensure that workers are registered " +
                    "and have sufficient resources")
                } else {
                  this.cancel()
                }
              }
            }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
          }
          hasReceivedTask = true
        }
        backend.reviveOffers()
      }
    

    这个方法主要做了以下工作:

    • 为提交的TaskSet创建TaskSetManager;
    • 检测有无冲突的Task已经提交;
    • 将创建的TaskSetManager添加至schedulableBuilder;
    • 开启一个定时任务检测该TaskSet有没有启动,没有启动则输出警告日志;
    • 调用backend 的reviveOffers方法为TaskSet申请资源;

    现在以Local模式为例说明reviveOffers到底做了哪些工作:

      def reviveOffers() {
        val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
        for (task <- scheduler.resourceOffers(offers).flatten) {
          freeCores -= scheduler.CPUS_PER_TASK
          executor.launchTask(executorBackend, task)
        }
      }
    

    一个WorkOffer代表一个Executor上可用的资源,Local模式下每次创建一个WorkOffer。

    TaskSchedulerImp基于Backend提供的这些WorkOffer进行任务调度,分配到资源的Task生成一个TaskDescription对象并通过Executor 的launchTask进行启动执行。有点类似于给各个pending 的Task发放offer,只有发放了offer的Task才能被启动执行。

    那么,TaskSchedulerImp是如何基于现有的WorkOffer进行任务调度的呢?

      def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
        // Mark each slave as alive and remember its hostname
        // Also track if new executor is added
        var newExecAvail = false
        for (o <- offers) {
          if (!hostToExecutors.contains(o.host)) {
            hostToExecutors(o.host) = new HashSet[String]()
          }
          if (!executorIdToRunningTaskIds.contains(o.executorId)) {
            hostToExecutors(o.host) += o.executorId
            executorAdded(o.executorId, o.host)
            executorIdToHost(o.executorId) = o.host
            executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
            newExecAvail = true
          }
          for (rack <- getRackForHost(o.host)) {
            hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
          }
        }
    
        // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
        // this here to avoid a separate thread and added synchronization overhead, and also because
        // updating the blacklist is only relevant when task offers are being made.
        blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
    
        val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
          offers.filter { offer =>
            !blacklistTracker.isNodeBlacklisted(offer.host) &&
              !blacklistTracker.isExecutorBlacklisted(offer.executorId)
          }
        }.getOrElse(offers)
    
        val shuffledOffers = shuffleOffers(filteredOffers)
        // Build a list of tasks to assign to each worker.
        val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
        val availableCpus = shuffledOffers.map(o => o.cores).toArray
        val sortedTaskSets = rootPool.getSortedTaskSetQueue
        for (taskSet <- sortedTaskSets) {
          logDebug("parentName: %s, name: %s, runningTasks: %s".format(
            taskSet.parent.name, taskSet.name, taskSet.runningTasks))
          if (newExecAvail) {
            taskSet.executorAdded()
          }
        }
    
        // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
        // of locality levels so that it gets a chance to launch local tasks on all of them.
        // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
        for (taskSet <- sortedTaskSets) {
          var launchedAnyTask = false
          var launchedTaskAtCurrentMaxLocality = false
          for (currentMaxLocality <- taskSet.myLocalityLevels) {
            do {
              launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
                taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
              launchedAnyTask |= launchedTaskAtCurrentMaxLocality
            } while (launchedTaskAtCurrentMaxLocality)
          }
          if (!launchedAnyTask) {
            taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
          }
        }
    
        if (tasks.size > 0) {
          hasLaunchedTask = true
        }
        return tasks
      }
    
    • 这里的offers是指可用的资源,这个方法首先对offers进行了预处理,包括建立索引映射关系,过滤黑名单,对Offer进行洗牌(shuffle offer,避免每次调度资源的顺序一致导致Task始终分配到相同的work上);
    • 其次,通过rootPool.getSortedTaskSetQueue获取任务调度池中按照优先级排序后的TaskSets(调度策略可以是FIFO和FAIR等,可以通过spark.scheduler.mode配置);
    • 如果有新的Executor加入,则需要重新计算每个TaskSet的本地性;
    • 根据本地性原则为每一个Task创建TaskDescription,TaskDescription用于对任务进行描述,可以序列化后发送给Executor,其中包括addedFiles,addedJars等属性;

    为每一个TaskSet中的Task创建TaskDescription是通过resourceOfferSingleTaskSet实现的:

      private def resourceOfferSingleTaskSet(
          taskSet: TaskSetManager,
          maxLocality: TaskLocality,
          shuffledOffers: Seq[WorkerOffer],
          availableCpus: Array[Int],
          tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
        var launchedTask = false
        // nodes and executors that are blacklisted for the entire application have already been
        // filtered out by this point
        for (i <- 0 until shuffledOffers.size) {
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
          if (availableCpus(i) >= CPUS_PER_TASK) {
            try {
              for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                tasks(i) += task
                val tid = task.taskId
                taskIdToTaskSetManager(tid) = taskSet
                taskIdToExecutorId(tid) = execId
                executorIdToRunningTaskIds(execId).add(tid)
                availableCpus(i) -= CPUS_PER_TASK
                assert(availableCpus(i) >= 0)
                launchedTask = true
              }
            } catch {
              case e: TaskNotSerializableException =>
                logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
                // Do not offer resources for this task, but don't throw an error to allow other
                // task sets to be submitted.
                return launchedTask
            }
          }
        }
        return launchedTask
      }
    

    该方法遍历shuffledOffers,为每一个shuffledOffer分配Task,最终由TaskManager的resourceOffer方法找到合适当前shuffledOffer执行的Task,其逻辑如下:

      def resourceOffer(
          execId: String,
          host: String,
          maxLocality: TaskLocality.TaskLocality)
        : Option[TaskDescription] =
      {
        val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
          blacklist.isNodeBlacklistedForTaskSet(host) ||
            blacklist.isExecutorBlacklistedForTaskSet(execId)
        }
        if (!isZombie && !offerBlacklisted) {
          val curTime = clock.getTimeMillis()
    
          var allowedLocality = maxLocality
    
          if (maxLocality != TaskLocality.NO_PREF) {
            allowedLocality = getAllowedLocalityLevel(curTime)
            if (allowedLocality > maxLocality) {
              // We're not allowed to search for farther-away tasks
              allowedLocality = maxLocality
            }
          }
    
          dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
            // Found a task; do some bookkeeping and return a task description
            val task = tasks(index)
            val taskId = sched.newTaskId()
            // Do various bookkeeping
            copiesRunning(index) += 1
            val attemptNum = taskAttempts(index).size
            val info = new TaskInfo(taskId, index, attemptNum, curTime,
              execId, host, taskLocality, speculative)
            taskInfos(taskId) = info
            taskAttempts(index) = info :: taskAttempts(index)
            // Update our locality level for delay scheduling
            // NO_PREF will not affect the variables related to delay scheduling
            if (maxLocality != TaskLocality.NO_PREF) {
              currentLocalityIndex = getLocalityIndex(taskLocality)
              lastLaunchTime = curTime
            }
            // Serialize and return the task
            val serializedTask: ByteBuffer = try {
              ser.serialize(task)
            } catch {
              // If the task cannot be serialized, then there's no point to re-attempt the task,
              // as it will always fail. So just abort the whole task-set.
              case NonFatal(e) =>
                val msg = s"Failed to serialize task $taskId, not attempting to retry it."
                logError(msg, e)
                abort(s"$msg Exception during serialization: $e")
                throw new TaskNotSerializableException(e)
            }
            if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
              !emittedTaskSizeWarning) {
              emittedTaskSizeWarning = true
              logWarning(s"Stage ${task.stageId} contains a task of very large size " +
                s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
                s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
            }
            addRunningTask(taskId)
    
            // We used to log the time it takes to serialize the task, but task size is already
            // a good proxy to task serialization time.
            // val timeTaken = clock.getTime() - startTime
            val taskName = s"task ${info.id} in stage ${taskSet.id}"
            logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
              s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
    
            sched.dagScheduler.taskStarted(task, info)
            new TaskDescription(
              taskId,
              attemptNum,
              execId,
              taskName,
              index,
              addedFiles,
              addedJars,
              task.localProperties,
              serializedTask)
          }
        } else {
          None
        }
      }
    

    在该方法的主要逻辑流程是:

    • 检查所分配的Executor或者Executor所在的Host是否在黑名单中;
    • 本地性合法性检查;
    • 通过调用dequeueTask从TaskManager所管理的TaskSet中取出优先级最高的Task;
    • 记录bookkeeping并生成TaskDescription返回 .

    其中最重要的问题是如何取出一个最优的Task能够匹配所分配的Executor去执行,这个最优的定义是:

      /**
       * Dequeue a pending task for a given node and return its index and locality level.
       * Only search for tasks matching the given locality constraint.
       *
       * @return An option containing (task index within the task set, locality, is speculative?)
       */
      private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
        : Option[(Int, TaskLocality.Value, Boolean)] =
      {
        for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
          return Some((index, TaskLocality.PROCESS_LOCAL, false))
        }
    
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
          for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
            return Some((index, TaskLocality.NODE_LOCAL, false))
          }
        }
    
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
          // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
          for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
            return Some((index, TaskLocality.PROCESS_LOCAL, false))
          }
        }
    
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
          for {
            rack <- sched.getRackForHost(host)
            index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
          } {
            return Some((index, TaskLocality.RACK_LOCAL, false))
          }
        }
    
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
          for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
            return Some((index, TaskLocality.ANY, false))
          }
        }
    
        // find a speculative task if all others tasks have been scheduled
        dequeueSpeculativeTask(execId, host, maxLocality).map {
          case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
      }
    

    这个方法会根据期望的Locality级别找到匹配的Task。

    Task执行的本地化实现

    在DAGScheduler的submitMissingTasks中构造TaskSet时有这么一段程序:

        val tasks: Seq[Task[_]] = try {
          val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
          stage match {
            case stage: ShuffleMapStage =>
              stage.pendingPartitions.clear()
              partitionsToCompute.map { id =>
                val locs = taskIdToLocations(id)
                val part = stage.rdd.partitions(id)
                stage.pendingPartitions += id
                new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                  Option(sc.applicationId), sc.applicationAttemptId)
              }
    
            case stage: ResultStage =>
              partitionsToCompute.map { id =>
                val p: Int = stage.partitions(id)
                val part = stage.rdd.partitions(p)
                val locs = taskIdToLocations(id)
                new ResultTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, id, properties, serializedTaskMetrics,
                  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
              }
          }
        }
    

    无论是构造ShuffleMapTask还是ResultTask,都会从taskIdToLocations去获得每一个Task的Location信息,生成taskIdToLocations的源码如下:

        val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
          stage match {
            case s: ShuffleMapStage =>
              partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
            case s: ResultStage =>
              partitionsToCompute.map { id =>
                val p = s.partitions(id)
                (id, getPreferredLocs(stage.rdd, p))
              }.toMap
          }
        } catch {
          case NonFatal(e) =>
            stage.makeNewStageAttempt(partitionsToCompute.size)
            listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
            abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }
    

    这个方法调用了getPreferredLocs,而getPreferredLocs最终调用了getPreferredLocsInternal:

     private def getPreferredLocsInternal(
          rdd: RDD[_],
          partition: Int,
          visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
        // If the partition has already been visited, no need to re-visit.
        // This avoids exponential path exploration.  SPARK-695
        if (!visited.add((rdd, partition))) {
          // Nil has already been returned for previously visited partitions.
          return Nil
        }
        // If the partition is cached, return the cache locations
        val cached = getCacheLocs(rdd)(partition)
        if (cached.nonEmpty) {
          return cached
        }
        // If the RDD has some placement preferences (as is the case for input RDDs), get those
        val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
        if (rddPrefs.nonEmpty) {
          return rddPrefs.map(TaskLocation(_))
        }
    
        // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
        // that has any placement preferences. Ideally we would choose based on transfer sizes,
        // but this will do for now.
        rdd.dependencies.foreach {
          case n: NarrowDependency[_] =>
            for (inPart <- n.getParents(partition)) {
              val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
              if (locs != Nil) {
                return locs
              }
            }
    
          case _ =>
        }
        Nil
      }
    

    这个方法是一个递归调用:

    • 首先从缓存中取partition的TaskLocation,有就直接返回;否则进入下一步;
    • 如果RDD对应的是输入的RDD(Input RDD),则直接调用rdd.preferredLocations获得TaskLocation,否则递归找到其第一个narrow dependency,然后调用rdd.preferredLocations获得TaskLocation。
      RDD的preferredLocations方法实现如下:
    final def preferredLocations(split: Partition): Seq[String] = {
        checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
          getPreferredLocations(split)
        }
      }
    

    这个方法首先考虑从checkPointRDD中取Location,如果没有checkpoint,则调用getPreferredLocations,这个方法即前面介绍RDD中提到过的基本方法之一,用于指定RDD中split的最优执行位置。

    TaskLocation

    TaskLocation分为三种:

    • ExecutorCacheTaskLocation
    • HostTaskLocation
    • HDFSCacheTaskLocation

    相关文章

      网友评论

          本文标题:TaskScheduler

          本文链接:https://www.haomeiwen.com/subject/ywwanxtx.html