- 需求
有三个RDD ,分别是 rddA,rddB,rddC.取数据1,2,3,4,5
并且分成三个分区,对输入的数据的每一个数据*2 ,只取大于 6 的数据.
- 代码
val rddA = sc.parallelize(List(1, 2, 3, 4, 5),3)
//rddA: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]
val rddB = rddA.map(_*2)
//rddB: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1]
val rddC = rddB.filter(_>6)
//rddC: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2]
rddC.collect()
//res0: Array[Int] = Array(8, 10)
-
图解
rdd 血缘关系图
-
idea调试查看
idea查看依赖关系
-
通过 spark web ui 查看
spark-web-ui 依赖关系
-
补充
使用代码rddC.toDebugString
打印依赖关系
res1: String =
(2) MapPartitionsRDD[2] at filter at <console>:25 []
| MapPartitionsRDD[1] at map at <console>:25 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
窄依赖
- 说明:父RDD的每个分区只被一个子RDD分区使用一次
- 窄依赖有分为两种:
- 1.一种是一对一的依赖,即
OneToOneDependency
- 2.还有一个是范围的依赖
RangeDependency
,它仅仅被org.apache.spark.rdd.UnionRDD
使用。UnionRDD
是把多个RDD合成一个RDD,这些RDD是被拼接而成,每个父RDD的Partition
的相对顺序不会变,只不过每个父RDD在UnionRDD
中的Partition
的起始位置不同 - 常见算子
map
,filter
,union
,join
,mapPartitions
,mapValues
-
图解
窄依赖
宽依赖
- 说明:父RDD的每个分区都有可能被多个子RDD分区使用,子RDD分区通常对应父RDD所有分区
- 常见会对应
Shuffle
的操作.在会job
中产生一个stage
-
groupByKey
,join
,partitionBy
,reduce
- 常见算子
-
图解
宽依赖
wordCountDemo演示
val path = "/user/spark/data/wc.txt"
val lines = sc.textFile(path, 3)
//查看每个分区的数据
// lines.mapPartitionsWithIndex((n, partition) => {
// partition.map(x => (s"分区编号${n}", s"分区数据${x}"))
// }).foreach(println)
val words = lines.flatMap(_.split(","))
val wordPair = words.map(x => (x, 1))
val result = wordPair.reduceByKey(_ + _)
result.collect().foreach(println)
-
图解
RDDWordCount
如果觉得文章不错的话记得关注下公号哦

网友评论