wordcount代码
sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.saveAsTextFile(args(1))
首先我们知道,RDD是遇到action操作才开始提交job任务,其他操作是transformation,lazy的模式,
RDD是什么
- A list of partitions(paritition的数据集合)
- A function for computing each split (一个函数作用于每个分片)
- A list of dependencies on other RDDs (RDD之间有依赖关系)
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) ()
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (移动计算,不移动数据)
源码分析
红框里面调用的参数,如果写过MapReduce的话,会非常熟悉,所以我们可以知道,textFile中调用的api,实际就是hadoop里面的默认读取文件的类,该类就是TextInputFormat
image.png
创建了一个HadoopRDD,RDD里,getPartitions是决定多少个partitions,决定多少个task执行的,所以我们看HadoopRDD中的getPartitions方法
image.png
image.png
image.png
TextInputFormat继承于FileInputFormat,所以查看inputFormat,查看源码我们可以知道,textFile算子,得到的task数量,就是hadoop中使用MapReduce的map的输入数量,而这个数量并不是我们通常知道的,块的整数倍,而是块的1.1倍
image.png
image.png
image.png
RDD中的getPartitions我们可以知道,计算的时候,输入的partitions数量,
compute
方法可以知道,每一个partition是怎么获取数据,首先查看compute方法,可以知道他是怎么获取数据的
image.png
image.png
getPreferredLocations方法可以知道如何选取最优计算数据。
image.png
上面的分析可以知道,textFile中首先调用的是hadoop的TextInputFormat类去hdfs读取数据,获取key,value,然后使用map,只保存了value,key丢弃,map实际上是创建了一个MapPartitionsRDD+map函数=MapRDD,MapRDD重写了getPartitions,compute方法,firstParent很重要,
image.png
image.png
image.png
hadoopRdd将自己当初this,进入构造方法,就是MapRDD,new OneToOneDependency(HadoopRDD)形成父子依赖关系,MapRDD依赖于hadoopRDD
image.png
image.png
image.png
总结,TextFile方法,里面产生了两个RDD,一个是HadoopRDD,一个是MapRDD,HadoopRDD跟MapRDD又形成父子依赖关系
接下来调用flatMap,MapPartitionsRDD + fflatMap函数=FlatMapRDD,同理分析,MapRDD与FlatMapRDD形成父子依赖关系
image.png
接下来调用map,MapPartitionsRDD + map函数=MapRDD,同理分析,FlatMapRDD与MapRDD形成父子依赖关系
所以,
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1))
形成的依赖关系,以及数据流图image.png
image.png
接下来分析reduceByKey
,RDD里面没有reducebykey,这里使用了隐式转换,将RDD=>PairRDDFunctions,ShuffledRDD
没看懂,但是可以知道,这里发生了shuff操作
image.png
image.png image.png
最后,saveAsTextFile
,saveAsTextFile
不仅仅有rdd,还有Action的功能,spark中,action是需要提交job的
image.png
image.png
image.png
总结,到此,rdd的依赖关系与数据流图为
image.png
image.png
接下来分析job提交
image.pngimage.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
递归的方式找到第一个stage然后提交
submitMissingTasks(stage, jobId.get)
image.png image.png
参考:
http://blog.csdn.net/firstblood1/article/details/53444048
http://mzorro.me/2015/08/11/spark-wordcount-analyse/
http://tieba.baidu.com/p/4491480910?see_lz=1
陶思源大人
https://spark-internals.books.yourtion.com/markdown/1-Overview.html
http://guozhongxin.com/pages/2015/01/25/spark_dagscheduler.html
http://blog.csdn.net/oopsoom/article/details/38763985
网友评论