美文网首页
Spark源码剖析(四):WordCount的Stage划分

Spark源码剖析(四):WordCount的Stage划分

作者: Java技术范 | 来源:发表于2018-07-29 21:35 被阅读0次

    WordCount的代码

    主要是从HDFS读取文件后进行单词切割,然后进行计数,如果不懂RDD算子可以看RDD详解

    WordCount的各个算子

    SparkRDD的运行流程

    SparkRDD宽依赖和窄依赖

    SparkRDD之间的依赖主要有:

    1.宽依赖

    宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

    总结:窄依赖我们形象的比喻为超生

    2.窄依赖

    窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

    总结:窄依赖我们形象的比喻为独生子女

    结合WordCount的源码分析

    WordCount算子内部解析

    在WordCount程序中,第一个使用的Spark方法是textFile()方法,主要的源码是

    这个方法的主要作用是从HDFS中读取数据, 这里创建一个HadoopRDD,在这个方法内部还创建一个MapPartitionRDD,接下里的几个 RDD同样是MapPartitionRDD,最主要的是看saveAsTextFile()方法。 下面是saveAsTextFile()方法,代码在RDD类的1272行,具体内容如下:

    这个方法的主要作用是产生一个RDD,MapPartitionsRDD;然后将RDD转化为PairRDDFuctions,接下来是saveAsHadoopFile()方法: 主要的代码如下:

    继续查看saveAsHadoopDataset()方法源码,主要的代码如下:

    代码解析:

    1.获取写入HDFS中的文件流

    2.一个函数将分区数据迭代的写入到HDFS中

    3.开始提交作业,Self表示Final RDD也就是作业最后的RDD在WordCount中也就是MapPartitionsRDD

    这里我们将会追踪到runJob()方法中,

    这里我们继续追踪到runJob()的重载方法,夏满是这个方法的核心代码:

    这里是非常重要的方法,主要做的工作是调用SparkContext类中创建的dagScheduler,使用dagScheduler划分Stage,然后将Stage转化为TaskSet交给TaskScheduler在交个Executor执行

    划分Stage

    在前面的分析中,我们已经知道了dagScheduler调用了runJob()方法,这个方法的作用是划分stage。

    这里主要是划分stage,然后调用submitJob()返回一个调度器,这里我们继续查看submitJob()方法。

    上面是submitJob()方法的核心代码,主要的作用是eventProcessLoop对象内部有一个阻塞队列和线程,先将数据封装到Case Class中将事件放入到阻塞队列。

    对于JobSubmitted类的模式匹配,主要的代码如下:

    这里调用dagScheduler的handleJobSubmitted()方法,这个方法是对stage划分的主要方法,主要的核心代码:

    通过newStage()方法,根据这个方法在这里可以看出分区的数量决定Task数量。 通过追踪newStage()方法,主要的代码如下:

    这个方法是递归的划分Stage,主要的方法是getParentStages(rdd, jobId),具体的划分代码如下:

    stage划分算法如下:

    涉及的数据结构:栈、HashSet

    1.通过最后的RDD,获取父RDD

    2.将finalRDD放入栈中,然后出栈,进行for循环的找到RDD的依赖,需要注意的是RDD可能有多个依赖

    3.如果RDD依赖是ShuffleDependency,那么就可以划分成为一个新的Stage,然后通过getShuffleMapStage()获取这个stage的父stage;如果是一般的窄依赖,那么将会入栈

    4.通过getShuffleMapStage()递归调用,得到父stage;一直到父stage是null

    5.最后返回stage的集合

    stage提交算法

    在对于最后一个RDD划stage后,进行提交stage,主要的方法是:

    这里和划分stage的算法一样,拿到最后的stage然后找到第一个stage开始从第一个stage开始提交。

    stage提交

    下面的代码是submitMissingTasks(),主要是核心的代码:

    这里主要做的工作是根据分区数量决定Task数量,然后根据stage的类型创建Task,这里主要有ShuffleMapTask和ResultTask。

    ShuffleMapTask:进行分区局部聚合,从上游拉去数据。

    ResultTask:将结果写入持久化介质.比如HDFS等。

    这里将Task进行封装成为TaskSet进行提交给taskScheduler。

    关于Stage划分流程图

    总结

    1.textFile()方法会产生两个RDD,HadoopRDD和MapPartitionRDD

    2.saveTextAsFile()方法会产生一个RDD,MapPartitionRDD

    3.Task数量取决于HDFS分区数量

    4.Stage划分是通过最后的RDD,也就是final RDD根据依赖关系进行递归划分

    5.stage提交主要是通过递归算法,根据最后一个Stage划分然后递归找到第一个stage开始从第一个stage开始提交。

    喜欢小编的文章可以关注哟!

    相关文章

      网友评论

          本文标题:Spark源码剖析(四):WordCount的Stage划分

          本文链接:https://www.haomeiwen.com/subject/pemtvftx.html