def collect(): Array[T] = withScope {
//这里的this是当前rdd(调用action算子的rdd),后面会有传递
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
//这里的this是当前rdd(调用action算子的rdd),后面会有传递
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
eventProcessLoop POST JobSubmitted 事件
eventProcessLoop.post(JobSubmitted(
//这里的this是当前rdd(调用action算子的rdd),后面会有传递
jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
clonedProperties))
listener.awaitResult()
eventThread 消费事件进行处理
private[spark] val eventThread = new Thread(name) {
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
... ...
}
}
}
doOnReceive
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
DAGScheduler.handleJobSubmitted 核心代码
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
try {
//创建ResultStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
DAGScheduler.createResultStage
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (shuffleDeps, resourceProfiles) =
//获取rdd 的ShuffleDependencies
getShuffleDependenciesAndResourceProfiles(rdd)
... ...
//创建parent stage
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
//创建ResultStage,//这里的rdd(调用action算子的rdd)
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite, resourceProfile.id)
... ...
stage
}
private[scheduler] def getShuffleDependenciesAndResourceProfiles(
rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val resourceProfiles = new HashSet[ResourceProfile]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
toVisit.dependencies.foreach {
//如果rdd的Dependency 是ShuffleDependency类型就放入Dependencies返回
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.prepend(dependency.rdd)
}
}
}
(parents, resourceProfiles)
}
DAGScheduler.getOrCreateParentStages
private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
firstJobId: Int): List[Stage] = {
//遍历shuffleDeps 对每一个shuffleDep创建ShuffleMapStage
.map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
DAGScheduler.createShuffleMapStage
def createShuffleMapStage[K, V, C](
shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
... ...
val numTasks = rdd.partitions.length
//创建依赖的parent stage
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
//穿建mapstage
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,
resourceProfile.id)
stageIdToStage(id) = stage
... ...
}
stage
}
网友评论