美文网首页
Spark源码--DAG创建以及Stage的生成

Spark源码--DAG创建以及Stage的生成

作者: 小北觅 | 来源:发表于2019-12-14 18:38 被阅读0次

本文基于spark-2.4.4源码

关于RDD的操作有两种:transformation操作和action操作。只有遇到action算子时任务才会真正开始执行。那我们今天就来看看DAG的创建和Stage的生成。

首先找到一个action算子,这里以collect()为例。

调用的是sparkContext类中的runJob方法

经过一系列重载的runJob方法,来到下面这个runJob:

这个方法里比较重要的是调用dagScheduler.runJob()。继续追,进入DagScheduler类的runJob()方法:

这个方法里面主要还是为了调用submitJob,然后返回一个JobWaitor对象waiter,通过异步机制获取运行结果。这里我们进入submitJob()方法。

submitJob()里比较重要的点如下:

生成一个JobSubmitted对象,然后提交给eventProcessLoop。这个eventProcessLoop是DAGSchedulerEventProcessLoop类的对象。是一个用来处理各种类型事件的对象。

刚刚post里传的是JobSubmitted对象,所以处理的事件类型也应该是JobSubmitted。

所以我们会进入到这个case JobSubmitted这个分支中。然后就会执行handleJobSubmitted()方法。

这个方法比较长,关注两个地方(用红色箭头标识的地方)。首先会创建finalStage。最后会提交finalStage。为什么创建的是finalStage呢?因为我们是遇到action算子后才会执行到这个位置,一个action算子对应一个job,一个job对应多个stage。我们在传入参数的时候rdd就是action算子之前的一个rdd,所以创建stage的过程是由后向前进行的。submitStage(finalStage)这个方法会从ShuffleDependecy处分开不断创建父Stage,直到没有父Stage为止。

进入到submitStage()方法中去:

可以看到此方法的doc写到:

 /** Submits stage, but first recursively submits any missing parents. */

提交stage,但是首先会递归地提交parents stage。看这句代码:

val missing = getMissingParentStages(stage).sortBy(_.id)
这个就是得到所有的没有提交的父Stage。
进入getMissingParentStages()方法看看:

这个方法回溯访问parent stage。
visited变量存储访问过的rdd,missing存储未提交的parent stages,waitingForVisit用来保存等待visit的rdd,和visited对应。关注visit方法的逻辑:对于传入的rdd,访问此rdd的依赖,如果遇到的是ShuffleDependency,那么就创建ShuffleMapStage,并且在missing中添加此Stage。如果是窄依赖,那么就把当前依赖对应的rdd加入到waitingForVisit中。

其实从visit方法中,我们就能看出Spark划分阶段的逻辑:当遇到ShuffleDependency时就创建新的Stage。当遇到narrowDep,就把上一个rdd(因为在窄依赖中存的rdd是prev)加到waitingForVisit中等待visit,不会创建新的Stage。

再到getOrCreateShuffleMapStage方法中:


此方法功能是:如果ShuffleMapStage没创建过(根据shuffleDep.shuffleId作为标识)那就创建它,并返回stage。在创建ShuffleMapStage的过程中还会通过getMissingAncestorShuffleDependencies()方法创建所有祖先Stage,从而完整的构建出DAG图。

我们再看一下getMissingAncestorShuffleDependencies()方法:

至此从action算子的源码到Stage生成的源码我们都走了一遍,有了一些初步的印象。

相关文章

网友评论

      本文标题:Spark源码--DAG创建以及Stage的生成

      本文链接:https://www.haomeiwen.com/subject/tmgxnctx.html