-
CoarseGrainedExecutorBackend#receive()
接收LaunchTask
消息并处理
override def receive: PartialFunction[Any, Unit] = {
......
//接收到LaunchTask消息
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value) //获得TaskDescription
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc) //启动Task
}
......
}
2.org.apache.spark.executor.Executor#launchTask
得到TaskRunner
并提交线程池
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
// Start worker thread pool
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
......
})
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription) //TaskRunner继承自Runnable
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr) //TaskRunner对象提交线程池执行
}
-
org.apache.spark.executor.Executor.TaskRunner#run
反序列化得到Task
并执行其run方法
/**
* The task to run. This will be set in run() by deserializing the task binary coming
* from the driver. Once it is set, it will never be changed.
*/
@volatile var task: Task[Any] = _
override def run(): Unit = {
......
//此处反序列化出真正的Task 关键在于taskDescription.serializedTask
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask,Thread.currentThread.getContextClassLoader)
......
//此处最终执行Task的run方法
val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
}
......
}
-
org.apache.spark.scheduler.Task#run()
执行org.apache.spark.scheduler.Task#runTask()
终于到了最后调用
def runTask(context: TaskContext): T //这是一个需要子类实现的方法 e.g ShuffleMapTask ResultTask
Task的子类们.png
Task执行的主干逻辑还是非常简单,CoarseGrainedExecutorBackend
收到消息,将传输过来的数据(TaskDescription)
传入TaskRunner
,并交由Executor
内部线程池执行.
处理中,主要靠org.apache.spark.scheduler.TaskDescription#serializedTask
这个ByteBuffer
反序列化出真正的Task
对象.
最后Task本身run方法中调用子类实现的runTask()
最终逻辑就藏于此
收工!!!
网友评论