1. 概述
在DAGScheduler里最高层的抽象是JOB, job被DAG描述成stage和它的依赖们. Stage内部是一个一个的task. 而这些Task是TaskScheduler的最高级抽象.
Task是发到Executor去执行的最具体的任务, 每个Parition在一连串的Task操作下, 变成了Result或者中间状态ShuffleStage
TaskScheduler
后边就TaskSchedulerImpl来进行讲解, 这个实现主要针对StandAlone模式
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
* It can also work with a local setup by using a LocalBackend and setting isLocal to true.
* It handles common logic, like determining a scheduling order across jobs, waking up to launch
* speculative tasks, etc.
*
* Clients should first call initialize() and start(), then submit task sets through the
* runTasks method.
*
* THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
* threads, so it needs locks in public API methods to maintain its state. In addition, some
* SchedulerBackends synchronize on themselves when they want to send events here, and then
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
2. Task的部分代码
2.1 初始化
Task是一个Abstrct class.
对应ShuffleMapStage
和ResultStage
, 也有两种task, 一种是ShuffleMapTask
, 另外一种是ResultTask
- stageId 可以用这个ID去DAGScheduler那里获得stage的句柄
- stageAttemptId 如果不幸失败重算的话, 用这个ID来跟踪. 也就是说stageId是逻辑上的ID, 多次执行同一个任务可能是同一个stageId, 但stageAttemptId是绝不重复的(Warning: 此处理解可能有误)
- paritionId 这个task是针对哪个parition进行操作的, 可以问driver这个parition在哪台executor上
- internalAccumulators 用于计数的全局变量
/**
* A unit of execution. We have two kinds of Task's in Spark:
*
* - [[org.apache.spark.scheduler.ShuffleMapTask]]
* - [[org.apache.spark.scheduler.ResultTask]]
*
* A Spark job consists of one or more stages. The very last stage in a job consists of multiple
* ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
* and sends the task output back to the driver application. A ShuffleMapTask executes the task
* and divides the task output to multiple buckets (based on the task's partitioner).
*
* @param stageId id of the stage this task belongs to
* @param partitionId index of the number in the RDD
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
internalAccumulators: Seq[Accumulator[Long]]) extends Serializable
2.2 内部方法 run
run
方法是被executor调用
runTask是内部调用的真正入口, 继承这个AbstracClass的实现必须要override这个方法.
/**
* Called by [[Executor]] to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task along with updates of Accumulators.
*/
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
// 这里是初始化一些重要的环境变量和上下文句柄, 里面有一大堆参数, 比较有意思的是taskMemoryManager
context = new TaskContextImpl(...)
context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
// 调用具体的task
(runTask(context), context.collectAccumulators())
} catch {
// 各种错误处理
} finally {
// Call the task completion callbacks.
context.markTaskCompleted()
try {
/**
注意, 这里涉及到比较重要的一个概念是预分配(unroll)机制.
在spark执行的时候, 为了防止oom, 内存是线程去预占用的, 很接近malloc的概念, 如果申请的内存小于当前空闲的内存, 就需要等其它任务结束.
在任务结束的时候不但要释放真正用到的内存, 预申请嗯嗯内存也要全部释放.
*/
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the future.
val memoryManager = SparkEnv.get.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
TaskContext.unset()
}
}
}
2.3 静态方法serializeWithDependencies deserializeWithDependencies
每个task
被发到executor执行的时候, 由网络传输时都是byte流, 也就是说需要序列化后传输, 然后在执行端反序列化.序列化的过程中会把执行这个task用到的各种上下文的环境变量都打包进去.
在传输时, task被切成4k一个的字节流分块传输, 可以说是非常tricky了
/**
* Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
* need to send the list of JARs and files added to the SparkContext with each task to ensure that
* worker nodes find out about it, but we can't make it part of the Task because the user's code in
* the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
* first writing out its dependencies.
*/
private[spark] object Task {
/**
* Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
*/
def serializeWithDependencies(
task: Task[_],
currentFiles: HashMap[String, Long],
currentJars: HashMap[String, Long],
serializer: SerializerInstance)
: ByteBuffer = {
val out = new ByteArrayOutputStream(4096)
val dataOut = new DataOutputStream(out)
// Write currentFiles
dataOut.writeInt(currentFiles.size)
for ((name, timestamp) <- currentFiles) {
dataOut.writeUTF(name)
dataOut.writeLong(timestamp)
}
// Write currentJars
dataOut.writeInt(currentJars.size)
for ((name, timestamp) <- currentJars) {
dataOut.writeUTF(name)
dataOut.writeLong(timestamp)
}
// Write the task itself and finish
dataOut.flush()
val taskBytes = serializer.serialize(task).array()
out.write(taskBytes)
ByteBuffer.wrap(out.toByteArray)
}
2.4 Task的状态
LAUNCHING
-
RUNNING
when the task is being started. -
FINISHED
when the task finished with the serialized result. -
FAILED
when the task fails, -
KILLED
when an executor kills a task. LOST
3. ShuffleMapTask
像上文说的, 这里主要override了runTask
这个实际的执行方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
// 这里的shuffleManager用来管理shuffle的结果状态, 让driver知道数据在哪里. 后文会介绍
val manager = SparkEnv.get.shuffleManager
// 执行一系列的操作后, 把shuffle的结果刷到某个地方, 硬盘或者内存, 这个和性能关系很大, 后文会详细介绍.
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
各种错误处理
}
}
因为ShuffleMapTask主要关注的是中间结果, 所以对它最重要的是把parition一阵操作后的结果, 也就是等待shuffle到别的机器上的数据管理起来. 首先肯定要存在本地的某个地方, 之后要汇报给Driver, 这样Driver的DAGScheduler才知道依赖这个stage的下一个stage去哪里要数据.
4. ResultTask
相对来说ResultTask的runTask
就轻松很多, 执行就完事了, 因为是最后一个stage了. 后边也不会有依赖, 就是对这个parition各种操作, 然后告诉Driver执行结果就OK.
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
5. MapStatus
我们在读源码的时候, 会发现Task内部维护的T
对象是MapStatus.
这是用来跟踪ShuffleMapTask执行结果的结构
trait MapStatus {
def location: BlockManagerId
def getSizeForBlock(reduceId: Int): Long
}
它包含了两个重要信息, 一个是BlockManager, 顾名思义, 这是SparkContext中管理数据块block
的管理器, 前文我们队它有大概的介绍.
另外一个是getSizeForBlock
这是让后续的reduce方法对于它要拉取多大的数据块有一个大概的估计. 这样它才能去预申请内存. 我们上文也在注释里提到过, spark对内存的管理是预申请的机制, 所以每一步都会出现这样的估计值让对应的程序有时间去申请内存. 如果实在申请不到, 就只能抛出OOM, 并提醒用户把内存加大, 或者把并行度加大. 让每个task能够申请的资源变多,或者每个task需要用到的资源更少.
这个trait实际实现有两种
- CompressedMapStatus 以byte为单位汇报大小估计
- HighlyCompressedMapStatus 保存的是一个bitmap用来标记哪些block是空的, 以及平均每个block的大小是多少
网友评论