1. 概述
前文中我们可以看到在TaskScheduler
中的方法submitTask
提交的并不是Task
, 而是TaskSet
顾名思义, TaskSet是Task的集合, 这个结构维护了所有的在这个stage中的task.
我们知道一个stage过程其实是对所有的parition执行transform过程, 直到遇到了一个shuffle, 本地的partition数据不足以支撑计算, 要进行节点间通信. stage中的task数量和parition的数量是1:1的, 但是由于整个集群资源有限, 所以stage里的所有task并不是同时跑的, 而是按照资源和配置尽可能的同时跑. 上文我们也看到, 只有申请到了足够内存的task可以跑, 否则要等待其它的先跑完.
TaskSet有一定的有限顺序, 被TaskSetManager管理, 这个执行顺序实际上就是FIFO的.TaskSet中包含的是需要被执行的task. 像动态规划的思想一样, 可能上次执行的某些操作导致某些partition已经被执行了对应的task, 而且结果还存活着, 就不会进入这个待执行集合.
2. 源码
非常简单, 无需多言
/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + stageAttemptId
override def toString: String = "TaskSet " + id
}
网友评论