美文网首页
5.2 TaskScheduler 之Task

5.2 TaskScheduler 之Task

作者: GongMeng | 来源:发表于2018-11-16 18:14 被阅读0次

1. 概述

在DAGScheduler里最高层的抽象是JOB, job被DAG描述成stage和它的依赖们. Stage内部是一个一个的task. 而这些Task是TaskScheduler的最高级抽象.
Task是发到Executor去执行的最具体的任务, 每个Parition在一连串的Task操作下, 变成了Result或者中间状态ShuffleStage


TaskScheduler

后边就TaskSchedulerImpl来进行讲解, 这个实现主要针对StandAlone模式

/**
 * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
 * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
 * It handles common logic, like determining a scheduling order across jobs, waking up to launch
 * speculative tasks, etc.
 *
 * Clients should first call initialize() and start(), then submit task sets through the
 * runTasks method.
 *
 * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
 * threads, so it needs locks in public API methods to maintain its state. In addition, some
 * SchedulerBackends synchronize on themselves when they want to send events here, and then
 * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
 * we are holding a lock on ourselves.
 */

2. Task的部分代码

2.1 初始化

Task是一个Abstrct class.
对应ShuffleMapStageResultStage, 也有两种task, 一种是ShuffleMapTask, 另外一种是ResultTask

  • stageId 可以用这个ID去DAGScheduler那里获得stage的句柄
  • stageAttemptId 如果不幸失败重算的话, 用这个ID来跟踪. 也就是说stageId是逻辑上的ID, 多次执行同一个任务可能是同一个stageId, 但stageAttemptId是绝不重复的(Warning: 此处理解可能有误)
  • paritionId 这个task是针对哪个parition进行操作的, 可以问driver这个parition在哪台executor上
  • internalAccumulators 用于计数的全局变量
/**
 * A unit of execution. We have two kinds of Task's in Spark:
 *
 *  - [[org.apache.spark.scheduler.ShuffleMapTask]]
 *  - [[org.apache.spark.scheduler.ResultTask]]
 *
 * A Spark job consists of one or more stages. The very last stage in a job consists of multiple
 * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
 * and sends the task output back to the driver application. A ShuffleMapTask executes the task
 * and divides the task output to multiple buckets (based on the task's partitioner).
 *
 * @param stageId id of the stage this task belongs to
 * @param partitionId index of the number in the RDD
 */
private[spark] abstract class Task[T](
    val stageId: Int,
    val stageAttemptId: Int,
    val partitionId: Int,
    internalAccumulators: Seq[Accumulator[Long]]) extends Serializable 

2.2 内部方法 run

run方法是被executor调用
runTask是内部调用的真正入口, 继承这个AbstracClass的实现必须要override这个方法.

/**
   * Called by [[Executor]] to run this task.
   *
   * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
   * @param attemptNumber how many times this task has been attempted (0 for the first attempt)
   * @return the result of the task along with updates of Accumulators.
   */
  final def run(
    taskAttemptId: Long,
    attemptNumber: Int,
    metricsSystem: MetricsSystem)
  : (T, AccumulatorUpdates) = {

  // 这里是初始化一些重要的环境变量和上下文句柄, 里面有一大堆参数, 比较有意思的是taskMemoryManager
    context = new TaskContextImpl(...)
   context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
    taskThread = Thread.currentThread()
    if (_killed) {
      kill(interruptThread = false)
    }
    try {
       // 调用具体的task
      (runTask(context), context.collectAccumulators())
    } catch {
       // 各种错误处理
    } finally {
      // Call the task completion callbacks.
      context.markTaskCompleted()
      try {
        /**
          注意, 这里涉及到比较重要的一个概念是预分配(unroll)机制.
          在spark执行的时候, 为了防止oom, 内存是线程去预占用的, 很接近malloc的概念, 如果申请的内存小于当前空闲的内存, 就需要等其它任务结束.
          在任务结束的时候不但要释放真正用到的内存, 预申请嗯嗯内存也要全部释放.
        */
        Utils.tryLogNonFatalError {
          // Release memory used by this thread for unrolling blocks
          SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
          SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
          // Notify any tasks waiting for execution memory to be freed to wake up and try to
          // acquire memory again. This makes impossible the scenario where a task sleeps forever
          // because there are no other tasks left to notify it. Since this is safe to do but may
          // not be strictly necessary, we should revisit whether we can remove this in the future.
          val memoryManager = SparkEnv.get.memoryManager
          memoryManager.synchronized { memoryManager.notifyAll() }
        }
      } finally {
        TaskContext.unset()
      }
    }
  }

2.3 静态方法serializeWithDependencies deserializeWithDependencies

每个task被发到executor执行的时候, 由网络传输时都是byte流, 也就是说需要序列化后传输, 然后在执行端反序列化.序列化的过程中会把执行这个task用到的各种上下文的环境变量都打包进去.
在传输时, task被切成4k一个的字节流分块传输, 可以说是非常tricky了

/**
* Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
* need to send the list of JARs and files added to the SparkContext with each task to ensure that
* worker nodes find out about it, but we can't make it part of the Task because the user's code in
* the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
* first writing out its dependencies.
*/
private[spark] object Task {
 /**
  * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
  */
 def serializeWithDependencies(
     task: Task[_],
     currentFiles: HashMap[String, Long],
     currentJars: HashMap[String, Long],
     serializer: SerializerInstance)
   : ByteBuffer = {

   val out = new ByteArrayOutputStream(4096)
   val dataOut = new DataOutputStream(out)

   // Write currentFiles
   dataOut.writeInt(currentFiles.size)
   for ((name, timestamp) <- currentFiles) {
     dataOut.writeUTF(name)
     dataOut.writeLong(timestamp)
   }

   // Write currentJars
   dataOut.writeInt(currentJars.size)
   for ((name, timestamp) <- currentJars) {
     dataOut.writeUTF(name)
     dataOut.writeLong(timestamp)
   }

   // Write the task itself and finish
   dataOut.flush()
   val taskBytes = serializer.serialize(task).array()
   out.write(taskBytes)
   ByteBuffer.wrap(out.toByteArray)
 }

2.4 Task的状态

  • LAUNCHING
  • RUNNING when the task is being started.
  • FINISHED when the task finished with the serialized result.
  • FAILED when the task fails,
  • KILLED when an executor kills a task.
  • LOST

3. ShuffleMapTask

像上文说的, 这里主要override了runTask这个实际的执行方法

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
     // 这里的shuffleManager用来管理shuffle的结果状态, 让driver知道数据在哪里. 后文会介绍
      val manager = SparkEnv.get.shuffleManager
     
      // 执行一系列的操作后, 把shuffle的结果刷到某个地方, 硬盘或者内存, 这个和性能关系很大, 后文会详细介绍.
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    } catch {
        各种错误处理
    }
  }

因为ShuffleMapTask主要关注的是中间结果, 所以对它最重要的是把parition一阵操作后的结果, 也就是等待shuffle到别的机器上的数据管理起来. 首先肯定要存在本地的某个地方, 之后要汇报给Driver, 这样Driver的DAGScheduler才知道依赖这个stage的下一个stage去哪里要数据.

4. ResultTask

相对来说ResultTask的runTask就轻松很多, 执行就完事了, 因为是最后一个stage了. 后边也不会有依赖, 就是对这个parition各种操作, 然后告诉Driver执行结果就OK.

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

5. MapStatus

我们在读源码的时候, 会发现Task内部维护的T对象是MapStatus.
这是用来跟踪ShuffleMapTask执行结果的结构

trait MapStatus {
  def location: BlockManagerId
  def getSizeForBlock(reduceId: Int): Long
}

它包含了两个重要信息, 一个是BlockManager, 顾名思义, 这是SparkContext中管理数据块block的管理器, 前文我们队它有大概的介绍.
另外一个是getSizeForBlock这是让后续的reduce方法对于它要拉取多大的数据块有一个大概的估计. 这样它才能去预申请内存. 我们上文也在注释里提到过, spark对内存的管理是预申请的机制, 所以每一步都会出现这样的估计值让对应的程序有时间去申请内存. 如果实在申请不到, 就只能抛出OOM, 并提醒用户把内存加大, 或者把并行度加大. 让每个task能够申请的资源变多,或者每个task需要用到的资源更少.

这个trait实际实现有两种

  • CompressedMapStatus 以byte为单位汇报大小估计
  • HighlyCompressedMapStatus 保存的是一个bitmap用来标记哪些block是空的, 以及平均每个block的大小是多少

相关文章

网友评论

      本文标题:5.2 TaskScheduler 之Task

      本文链接:https://www.haomeiwen.com/subject/qripfqtx.html