本文基于spark-2.4.4源码
关于RDD的操作有两种:transformation操作和action操作。只有遇到action算子时任务才会真正开始执行。那我们今天就来看看DAG的创建和Stage的生成。
首先找到一个action算子,这里以collect()为例。
调用的是sparkContext类中的runJob方法
经过一系列重载的runJob方法,来到下面这个runJob:
这个方法里比较重要的是调用dagScheduler.runJob()
。继续追,进入DagScheduler类的runJob()方法:
这个方法里面主要还是为了调用submitJob,然后返回一个JobWaitor对象waiter,通过异步机制获取运行结果。这里我们进入submitJob()方法。
submitJob()里比较重要的点如下:
生成一个JobSubmitted对象,然后提交给eventProcessLoop。这个eventProcessLoop是DAGSchedulerEventProcessLoop类的对象。是一个用来处理各种类型事件的对象。
刚刚post里传的是JobSubmitted对象,所以处理的事件类型也应该是JobSubmitted。
所以我们会进入到这个case JobSubmitted这个分支中。然后就会执行handleJobSubmitted()
方法。
这个方法比较长,关注两个地方(用红色箭头标识的地方)。首先会创建finalStage。最后会提交finalStage。为什么创建的是finalStage呢?因为我们是遇到action算子后才会执行到这个位置,一个action算子对应一个job,一个job对应多个stage。我们在传入参数的时候rdd就是action算子之前的一个rdd,所以创建stage的过程是由后向前进行的。submitStage(finalStage)这个方法会从ShuffleDependecy处分开不断创建父Stage,直到没有父Stage为止。
进入到submitStage()方法中去:
可以看到此方法的doc写到:
/** Submits stage, but first recursively submits any missing parents. */
提交stage,但是首先会递归地提交parents stage。看这句代码:
val missing = getMissingParentStages(stage).sortBy(_.id)
这个就是得到所有的没有提交的父Stage。
进入getMissingParentStages()方法看看:
这个方法回溯访问parent stage。
visited变量存储访问过的rdd,missing存储未提交的parent stages,waitingForVisit用来保存等待visit的rdd,和visited对应。关注visit方法的逻辑:对于传入的rdd,访问此rdd的依赖,如果遇到的是ShuffleDependency,那么就创建ShuffleMapStage,并且在missing中添加此Stage。如果是窄依赖,那么就把当前依赖对应的rdd加入到waitingForVisit中。
其实从visit方法中,我们就能看出Spark划分阶段的逻辑:当遇到ShuffleDependency时就创建新的Stage。当遇到narrowDep,就把上一个rdd(因为在窄依赖中存的rdd是prev)加到waitingForVisit中等待visit,不会创建新的Stage。
再到getOrCreateShuffleMapStage方法中:
此方法功能是:如果ShuffleMapStage没创建过(根据shuffleDep.shuffleId作为标识)那就创建它,并返回stage。在创建ShuffleMapStage的过程中还会通过getMissingAncestorShuffleDependencies()方法创建所有祖先Stage,从而完整的构建出DAG图。
我们再看一下getMissingAncestorShuffleDependencies()方法:
至此从action算子的源码到Stage生成的源码我们都走了一遍,有了一些初步的印象。
网友评论