一、Task执行原理流程:
1.当Driver中SchedulerBackend给ExecutorBackend发送launchTask之后,首先会反序列化TaskDescription。
2.Executor会通过launchTask来执行Task
3.Executor会通过TaskRunner在ThreadPool来运行具体的Task,TaskRunner的run方法中首先会通过调用statueUpdate给Driver发送信息汇报自己的状态,说明自己是running状态
4.TaskRunner,内部会做一些准备工作,例如反序列化Task的依赖,然后通过网络来获取需要的文件、Jar等
5.然后是反序列化Task本身
6.调用反序列化后的Task.run方法来执行任务并获得执行的结果,其中Task的run方法会导致调用的时候会导致task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator()方法,该方法就是我们针对当前Task所对应的Partition进行计算的关键所在,在处理内部会迭代Partition的元素并给我们自定义的function进行处理。
对于shuffleMapTask,首先要要对RDD以及其依赖关系进行反序列化。最终计算会调用RDD的compute方法。具体计算时,有具体的RDD,例如Map
7.把执行结果序列化,并根据大小不同的结果传回给Driver的方式
8.CoresGraninExecutorBackend给DriverEndpoint发送StatusUpdate来传输结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter去分别处理Task执行成功或者失败的不同情况,然后告诉DAGScheduler任务处理结束的状况。
二、源码阅读
从Executor中的TaskRunner开始入手。看run方法:
1、对序列化的task数据,进行反序列化
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)/2
2、把文件、jar包拷贝过来
updateDependencies(taskFiles, taskJars)
3、通过正式的反序列化操作,将整个task的数据集反序列化
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
4、用task的run方法执行task,这个res就是对于ShuffleMapTask来说就是MapStatus,封装了ShuffleMapTask计算的数据的输出位置信息。后面的task就会去联系MapOutputTracker来获取上个ShuffleMapTask的位置拉取数据
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
看下run方法()
创建了一个TaskContextImpl,就是task的上下文,里面记录了task执行的一些全局性变量,比如task重试了几次,属于哪个stage,要处理Rdd哪个partition。然后调用抽象方法runTask()(具体实现就在子类ShuffleMapTask和ResultTask中)
5、下面同样分两种情况
ShuffleMapTask:
一个ShuffleMapTask会将一个RDD的元素,切分为多个bucket,基于一个在ShuffleDependency中指定的partitioner,默认就是HashPartitioner。
接下来看到它的runTask方法,这里注意,ShuffleMapTask的runTask有MapStatus返回值
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
//对task要处理的rdd相关数据,做一些反序列化操作
这个RDD,是怎么拿到的呢?
多个task运行在多个Executor中,都是并行运行,可能都不在同一个地方。
但是一个stage的task,其实要处理的rdd是一样的。
所以task怎么拿到自己要处理的那个rdd的数据
这里会通过broadcast variable直接拿到
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
//获取ShuffleManager,从ShuffleManager中获取ShuffleWriter
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//最重要的一行代码
首先调用了Rdd的iterator方法,并且传入了,当前task要处理哪个partition
所以,核心的逻辑,就在rdd的iterator方法中,这里实现了对Rdd某个partition执行我们自己的算子|
执行完了算子后,返回的数据,都是通过ShuffleWriter,经过HashPartition进行分区之后,写入自己对于的分区bucket
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
//最后返回结果MapStatus
MapStatus里面封装了ShuffleMapTask计算后的数据存储在哪里,其实就是BlockManager的相关信息
BlockManager,是spark底层的内存、数据、磁盘数据管理的组件。
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
接着看RDD的iterator方法
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
//CacheManager后续再看
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
//进行RDD partition的计算
computeOrReadCheckpoint(split, context)
}
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
//checkPoint后面再看
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
def compute(split: Partition, context: TaskContext): Iterator[T]
compute是个虚方法。从子类中可以看到,compute是针对RDD中的某个partition执行我们给这个RDD定义的算子和函数。Spark内部进行了封装,还实现了一些其他的逻辑。到此,就执行了计算操作,并返回新的RDD的partition的数据
6、run方法中最后调用Executor所在的CoraseGrainedExecutorBackend的statusUpdate方法,通知它分区结束。会向SchedulerBackend发送信息(有空再继续深入研究)
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
ResultTask:
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
//进行基本的反序列化
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
//执行通过rdd的iterator进行算子和函数操作
func(context, rdd.iterator(partition, context))
}
文末
从源码来看,先对Task运行结果value进行一些序列化操作, 然后统计Task运行时候的一些时间信息,比如GC的时间,Task运行时间等;在将结果进行一些封装之后,在序列化为serializedDirectResult,这里面用到了BlockManager组件(shuffle底层的内存管理组件,后面再单独分析它),接着就调用executor所在的CoarseGrainedExecutorBackend的statusUpdate()方法发送StatusUpdate消息给Driver的SparkDeploySchedulerBackend。
更多flutter学习,可以前往领取免费学习资料:【一键领取】
task的运行一开始不是直接调用底层的task的run方法直接处理job–>stage–>taskSet–>task这条路线的task任务的,它是通过分层和分工的思想来完成。
网友评论