Spark的DAG(Directed Acyclic Graph)的生成实际上是Stage的划分,而Stage的划分依据是RDD的依赖关系。在程序提交后,Spark先将所有的RDD看作是一个Stage,然后从后向前回溯,窄依赖划分到同一个Stage,遇到宽依赖(ShuffleDependency)则划分一个新的Stage,如此便形成了DAG。
DAG的实现在org.apache.spark.scheduler.DAGScheduler
中。
DAGScheduler源码解读
Stage的构建
spark DAG中stage的创建是通过getOrCreateParentStages方法实现的。
通过给定的RDD获取或创建父stages清单。首先通过getShuffleDependencies
方法获取所有的宽依赖,然后遍历宽依赖,构建Stage,返回Stage集合。
getOrCreateParentStages
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
getShuffleDependencies
通过rdd获取所有父RDD的宽依赖。
算法实现上采用了数组栈(ArrayStack)来实现。
/**
* Returns shuffle dependencies that are immediate parents of the given RDD.
*
* This function will not return more distant ancestors. For example, if C has a shuffle
* dependency on B which has a shuffle dependency on A:
*
* A <-- B <-- C
*
* calling this function with rdd C will only return the B <-- C dependency.
*
* This function is scheduler-visible for the purpose of unit testing.
*/
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
// 存放宽依赖
val parents = new HashSet[ShuffleDependency[_, _, _]]
// 存放已经访问过的RDD
val visited = new HashSet[RDD[_]]
// 存放待访问的RDD,通过数组栈来实现
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
getOrCreateShuffleMapStage
获取或创建ShuffleMapStage,其中ShuffleMapStage是说所有的Stage都保存在私有属性shuffleIdToMapStage 集合(HashMap)中,我们也可以将shuffleIdToMapStage理解为宽依赖注册中心。
该方法先从宽依赖注册中心(shuffleIdToMapStage)集合中获取Stage,如果存在则直接返回已存在的Stage,如果不存在则创建新的Stage。
/**
* Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for
* that dependency. Only includes stages that are part of currently running job (when the job(s)
* that require the shuffle stage complete, the mapping will be removed, and the only record of
* the shuffle data will be in the MapOutputTracker).
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
源码实现
从没有父宽依赖的Stage开始创建,然后对当前宽依赖创建Stage。
/**
* Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
* shuffle map stage doesn't already exist, this method will create the shuffle map stage in
* addition to any missing ancestor shuffle map stages.
*/
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
getMissingAncestorShuffleDependencies
获得所有没有父宽依赖的宽依赖。
算法实现上依然采用数组栈(ArrayStack),将宽依赖注册到宽依赖注册中心(shuffleIdToMapStage)中。
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
}
ancestors
}
DAGScheduler的创建
DAGScheduler何时创建的呢?
DAGScheduler是在SparkContext中创建的。
// _dagScheduler 是SparkContext的私有属性
@volatile private var _dagScheduler: DAGScheduler = _
...
_dagScheduler = new DAGScheduler(this)
...
DAGScheduler何时调用的呢?
其实可以想到,谁创建谁调用。DAGScheduler的调用是在SparkContext的runJob
方法中调用的。
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
DAGScheduler调用链
sc: SparkContext
ds: DAGScheduler
eventProcessLoop: DAGSchedulerEventProcessLoop
sc.runJob
----> ds.runJob
----> ds.submitJob
----> eventProcessLoop.post(JobSubmitted)
----> eventProcessLoop.onReceive
----> eventProcessLoop.doOnReceive
----> ds.handleJobSubmitted
handleJobSubmitted
这里才是重点,在handleJobSubmitted中,完成了ResultStage的创建,Job的创建,然后提交Stage(submitStage)。
下面是handleJobSubmitted
的伪代码。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
// 创建输出结果Stage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
...
// 下面省略了n行代码
...
//创建活动的Job
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
...
// 下面省略了n行代码
...
val jobSubmissionTime = clock.getTimeMillis()
// 注册活动Job到map中(jobId,job)
jobIdToActiveJob(jobId) = job
// 注册Job到Set中
activeJobs += job
// 注册job到输出finalStage中
finalStage.setActiveJob(job)
// 获得当前job对应的所有stageId
val stageIds = jobIdToStageIds(jobId).toArray
// 获得所有的Stage信息(stageInfo)
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
// 监听通知
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 提交Stage
submitStage(finalStage)
}
submitStage(Stage提交)
先判断job有没有定义。如果job没有定义,则终止stage(调用abortStage
方法)遗漏的Stage。
如果当前stage没有在等待中,执行中或失败的清单中,则继续提交。
判断有没有遗漏的父Stage。如果没有,则提交当前stage;如果有,则先提交父Stage,并将当前Stage添加到等待的stage集合中(waitingStages)。
伪代码如下:
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
// 如果jobId已定义
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
// 如果不在等待中,运行中或失败中,则继续提交
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 查询没有父Stage的stage。这里sort排序之后,有序的执行
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
// 如果不存在,则提交当前stage;否则提交父stage,并将当前stage添加到等待stage集合中
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// 当父stage全准备就绪了,此时就可以提交当前stage的task
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
// 递归提交stage
submitStage(parent)
}
waitingStages += stage
}
}
} else {
// 如果jobId没有定义,终止当前stage执行
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
submitMissingTasks
Called when stage's parents are available and we can now do its task.
当stage的父stage是可用的(也就是父stage运行成功),我们现在可以运行它的task的时候调用
这个方法是重点,难得在spark源码里遇到一个如此长的方法,可想而知提交task的复杂性。
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// 首先计算出要计算的分区ID的索引。
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// 从当前stage关联的活动job中,获得使用的调度池,job分组,描述等
val properties = jobIdToActiveJob(jobId).properties
// 将当前stage添加到运行中的stage集合中
runningStages += stage
// 在测试任务是否可序列化之前,应发布SparkListenersTageSubmitted。如果任务不可序列化,则将发布SparkListenersTageCompleted事件,该事件应始终位于相应的SparkListenersTageSubmitted事件之后。
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
// task本地化
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// 通过创建新的stageinfo和新的重试id来创建一个新的重试
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
// 如果有需要执行的task,则记录stage提交时间。否则,发布没有提交时间的事件,来表明会跳过当前stage
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// 或许我们可以保持task二进制在stage中来避免被多次序列化
// 任务广播的二进制文件,用于将任务task分发给executors。注意,我们广播了RDD的序列化副本,对于每个任务,我们将对其进行反序列化,这意味着每个任务都获得了RDD的不同副本。这在可能修改闭包中引用的对象状态的任务之间提供了更强的隔离。这在Hadoop中是必需的,因为jobconf/configuration对象不是线程安全的。
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// 不同的task,序列化和广播采用不同的方法
// 对于ShuffleMapTask,序列化和广播使用(rdd, shuffleDep)
// 对于ResultTask,序列化和广播使用 (rdd, func)
var taskBinaryBytes: Array[Byte] = null
// TaskBinaryBytes和分区都受检查点状态的影响。我们需要这种同步,以防另一个并发作业检查这个RDD,所以我们得到两个变量的一致视图。
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
// task二进制字节长度大于告警值,则打印告警日志
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
// task分发,广播task二进制字节码
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
// Abort execution
return
}
// 清除pending的分区,重新更新task
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// 如果有待执行的task,则提交task运行
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// 因为我们早先已经发布了SparkListenerStageSubmitted,我们需要标记该stage已经完成,因为没有可运行的task
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
// 至此,当前stage的task已经全部运行完成,然后我们提交等待的子stage开始运行
submitWaitingChildStages(stage)
}
}
总结
本文主要分析了Spark DAG原理,包括Stage的如何构建,什么时候调用。后面重点分析了DAGScheduler的调用链,这其中涉及到了提交一个job都经历了什么。涉及到job的构建,stage的提交,task的创建,task如何选择本地化的分区,task的序列化及广播分发到excutor等等。信息量有点儿大,建议这一块的代码多看几遍。
网友评论