整体流程图
流程图
源码分析 spark 2.3
getOrCreateParentStages 创建所有祖先Stage
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
// getShuffleDependencies 获取RDD的第一层直接宽依赖
getShuffleDependencies(rdd).map { shuffleDep =>
//getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
getShuffleDependencies 获取RDD的第一层直接宽依赖
/**
* Returns shuffle dependencies that are immediate parents of the given RDD.
*
* This function will not return more distant ancestors. For example, if C has a shuffle
* dependency on B which has a shuffle dependency on A:
*
* A <-- B <-- C
*
* calling this function with rdd C will only return the B <-- C dependency.
*
* This function is scheduler-visible for the purpose of unit testing.
*/
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
// 返回 所有的第一层宽依赖
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
/**
* Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
* shuffle map stage doesn't already exist, this method will create the shuffle map stage in
* addition to any missing ancestor shuffle map stages.
*/
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
// 深度遍历获取所有祖先宽依赖,按照祖先->子辈的顺序 处理宽依赖
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
// 创建宽依赖
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
getMissingAncestorShuffleDependencies 深度遍历
获取所有祖先宽依赖
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
// 子辈宽依赖先压栈
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
}
// 返回宽依赖 堆栈
ancestors
}
例子
RDDs原始依赖图
RDDs原始依赖图
getShuffleDependencies
获取RDD的第一层直接宽依赖
getMissingAncestorShuffleDependencies
深度遍历顺序获取所有祖先的宽依赖
最后划分结果
最后划分结果
网友评论