map
-
说明
通过指定变换函数将原有RDD中的元素逐个进行变换,并返回一个新的RDD - 示例
val rdd = sc.parallelize(1 to 5)
val mapRdd = rdd.map(_ * 2)
mapRdd.collect.foreach(println)
- 结果
2
4
6
8
10
mapPartitions
-
说明
与map类似,但独立的运行在RDD的每个分区上。通过分区数据集迭代器的方式,对整个分区进行处理。 - 示例
val rdd = sc.parallelize(1 to 5)
// mapPartitions由于是对整个分区进行计算,所以减少了与Executor的交互次数,提高了效率
// 但当分区数据较大时,可能或出现内存溢出(OOM)
val mapPartitionsRdd = rdd.mapPartitions(iter => {
iter.map(_ * 2)
})
mapPartitionsRdd.collect.foreach(println)
- 结果
2
4
6
8
10
mapPartitionsWithIndex(func)
-
说明
类似于mapPartitions,但变换函数带有一个表示分区的整形参数 - 示例
val rdd = sc.parallelize(1 to 5)
val mapPartitionsWithIndexRdd = rdd.mapPartitionsWithIndex((index,iter) => {
iter.map((index, _))
});
mapPartitionsWithIndexRdd.collect.foreach(println)
- 结果
(1, 1)
(1, 2)
(2, 3)
(2, 4)
(3, 5)
flapMap(func)
-
说明
类似map,但每一个输入元素可以被映射称为0个或多个输出元素 - 示例
val rdd = sc.parallelize(Array(List(1,2), List(3,4)))
val flatMapRdd = rdd.flatMap(x => x)
flatMapRdd.collect.foreach(println)
- 结果
1
2
3
4
glom
-
说明
将每一个分区形成一个数组 - 示例
val rdd = sc.parallelize(1 to 12, 4)
val glomRdd = rdd.glom
glomRdd.collect.foreach(println(_.mkString(",")))
- 结果
1,2,3
4,5,6
7,8,9
10,11,12
groupBy(func)
-
说明
按照传入函数的返回值进行分组,将相同键的对应值放入用一个迭代器 - 示例
// 按照指定规则进行分组,分组后的数据为一个对偶元组
//key表示分组键,value表示分组数据
val rdd = sc.parallelize(1 to 4)
val groupByRdd = rdd.groupBy(_ % 2)
groupByRdd.collectr.foreach(println)
- 结果
(0, CompactBuffer(2,4))
(1, CompactBuffer(1,3))
filter(func)
-
说明
根据指定规则进行过滤,将输入经过func计算,返回计算结果为true的值 - 示例
val rdd = sc.parallelize(1 to 4)
val filterRdd = rdd.filter(_ % 2 == 0)
filterRdd.collect.foreach(println)
- 结果
2
4
sample(withReplacement, fraction, seed)
-
说明
以指定的随机种子进行抽样比例为fraction的随机抽样,withReplacement表示抽样是否有放回,seed为随机数生成器种子 - 示例
val rdd = sc.parallelize(1 to 10)
val sampleRdd = rdd.sample(false, 0.4)
sampleRdd.collect().foreach(println)
- 结果
1
2
6
8
distinct([numPartitions])
-
说明
对源RDD进行去重操作,默认只能执行8个并行任务,也可以通过numTasks进行指定 - 示例
val rdd = sc.parallelize(Array(1,2,1,3,4,2,3,1))
val distinctRdd = rdd.distinct
distinctRdd.collect.foreach(println)
- 结果
1
2
3
4
coalesce(numPartitions)
-
说明
缩减分区数,用于大数据集过滤后提高小数据集的执行效率 - 示例
val rdd = sc.parallelize(1 to 12, 4)
System.out.println("before coalesce: " + rdd.partitions.size)
val coalesceRdd = rdd.coalesce(2)
System.out.println("after coalesce: " + coalesceRdd.partitions.size)
- 结果
before coalesce: 4
after coalesce: 2
repartition(numPartitions)
-
说明
根据指定的分区数,对数据进行重新分区 - 示例
val rdd = sc.parallelize(1 to 12, 4)
System.out.println("before repartition: " + rdd.partitions.size)
val repartitionRdd = rdd.repartition(6)
System.out.println("after repartition: " + repartitionRdd.partitions.size)
- 结果
before repartition: 4
after repartition: 6
[注]
coalesce重新分区,可以选择是否进行shuffle,由参数shuffle: Boolean = true/false决定
事实上,repartition是调用的coalesce方法,并且将shuffle设置为true
sortBy(func, [asceding], [numPartitions])
-
说明
使用func函数对数据处理后,对数据进行排序,默认是正序排序 - 示例
val rdd = sc.parallelize(Array(3,1,4,2))
val sortByRdd = rdd.sortBy( x => x )
sortByRdd.collect.foreach(println)
- 结果
1
2
3
4
union(otherDataset)
-
说明
将两个RDD求并集并返回一个新的RDD - 示例
val rdd1 = sc.parallelize(1 to 5)
val rdd2 = sc.parallelize(6 to 10)
val unionRdd = rdd1.union(rdd2)
unionRdd.collect.foreach(println)
- 结果
1
2
3
4
5
6
7
8
9
10
cartesian(otherDataset)
-
说明
对两个RDD求笛卡尔积 - 示例
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRdd = rdd1.cartesian(rdd2)
cartesianRdd.collect.foreach(println)
- 结果
(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)
zip(otherDataset)
-
说明
将两个RDD组合成为KV类型的RDD,默认两个RDD分区数以及元素数量相同,否则会报错 - 示例
val rdd1 = sc.parallelize(Array(1,2,3), 3)
val rdd2 = sc.parallelize(Array("a","b","c"), 3)
val zipRdd = rdd1.zip(rdd2)
zipRdd.collect.foreach(println)
- 结果
(1,a)
(2,b)
(3,c)
partitionBy
-
说明
根据指定的分区器对RDD进行分区 - 示例
val rdd = sc.parallelize(Array((1,"a"), (2,"b"), (3,"c"), (4, "d")), 4)
val partitionByRdd = rdd.partitionBy(new HashPartitioner(2))
partitionByRdd.collect.foreach(println)
- 结果
Array((2, "b"), (4, "d"))
Array((1, "a"), (3, "c"))
groupByKey
-
说明
按照key进行分组 - 示例
val rdd = sc.parallelize(Array("a", "a", "b", "c", "c", "c"))
val pairRdd = rdd.map((_, 1))
val groupRdd = pairRdd.groupByKey()
groupRdd.collect.foreach(println)
- 结果
("a", CompactBuffer(1, 1))
("b", CompactBuffer(1))
("c", CompactBuffer(1, 1, 1))
reduceByKey(func, [numPartitions])
-
说明
通过制定函数对相同key的值进行聚合 - 示例
val rdd = sc.parallelize(Array("a", "a", "b", "c", "c", "c"))
val pairRdd = rdd.map((_, 1))
val groupRdd = pairRdd.groupByKey()
val reduceRdd = groupRdd.reduceByKey(_ + _)
reduceRdd.collect.foreach(println)
- 结果
("a", 2)
("b", 1)
("c", 3)
aggregateByKey(zeroValue)(seqOp, combOp)
-
说明
将各个分区内的数据将zeroValue作为初始值的情况下,先通过seqOp函数进行分区内聚合,在通过combOp函数将各分区的聚合数据进行合并聚合 - 示例
val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
val aggrRdd = rdd.aggregateByKey(0)(Maths.max(_, _), _ + _)
aggrRdd.collect.foreach(println)
- 结果
("b", 3)
("a", 3)
("c", 12)
foldByKey(zeroValue)(func)
-
说明
与aggregateByKey功能相似,seqOp与combOp相同 - 示例
val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
val foldRdd = rdd.foldByKey(0)(_ + _)
foldRdd.collect.foreach(println)
- 结果
("b", 3)
("a", 5)
("c", 18)
combineByKey(createCombiner, mergeValue, mergeCombiner)
-
说明
遍历分区内全部key,如果是第一次遍历到该key则通过createCombiner函数创建初始值,遍历后通过mergeValue函数对分区内key相同的值进行聚合,最后通过mergeCombiner函数将各个分区的聚合结果,按照key进行聚合
-
示例
// 例:计算每个key的平均值
val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
val combineRdd = rdd.combineByKey((_, 1), (a, v) => (a._1 + v, a._2 + 1), (a1, a2) => (a1._1 + a2._1, a1._2 + a2._2))
val avgRdd = combineRdd.map((k, v) => (k, v._1 / v._2))
avg.Rdd.collect.foreach(println)
- 结果
("a", 2.5)
("b", 4.5)
("c", 9)
sortByKey([ascending], [numPartitions])
-
说明
按照key值进行排序,其中key的类型必须实现
Ordered
接口 -
示例
val rdd = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val sortRdd = rdd.sortByKey(true)
sortRdd.collect.foreach(println)
- 结果
(1,"a")
(2,"b")
(3,"c")
(4, "d")
mapValues
-
说明
对kv对RDD中的value进行map操作
-
示例
val rdd = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val upperRdd = rdd.mapValues(_.toUpperCase)
upperRdd.collect.foreach(println)
- 结果
(1,"A")
(2,"B")
(3,"C")
(4, "D")
join(otherDataset)
-
说明
将两个RDD具有相同key的元素进行组合
-
示例
val rdd1 = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val rdd2 = sc.parallelize(Array((4, 3), (2, 1), (3, 10), (1, 5)))
val joinRdd = rdd1.join(rdd2)
joinRdd.collect.foreach(println)
- 结果
(1, ("a", 5))
(2, ("b", 1))
(3, ("c", 10))
(4, ("d", 3))
cogroup(otherDataset, [numPartitions])
-
说明
将两个KV类型RDD组合,将value形成集合返回
-
示例
val rdd1 = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val rdd2 = sc.parallelize(Array((4, 3), (3, 10), (1, 5)))
val cogroupRdd = rdd1.cogroup(rdd2)
cogroupRdd.collect.foreach(println)
- 结果
(1, (Compact("a"), Compact(5)))
(2, (Compact("b"), Compact()))
(3, (Compact("c"), Compact(10)))
(4, (Compact("d"), Compact(3)))
网友评论