在Spark快速入门-RDD文章中学了spark的RDD。spark包含转换和行动操作。在进行spark程序开发的过程中,不可避免的要与spark rdd的转换和行动操作打交道。本文会介绍spark rdd常用的转换和行动操作。
转换操作
对一个RDD进行转换操作
函数名 | 描述 |
---|---|
filter() | 返回一个由通过传给filter()的函数的元素组成的RDD |
flatMap() | 将函数应用于RDD的每个元素,将返回的迭代器的所有内容构成新的RDD |
map() | 将函数应用于RDD中的每个元素,将返回值构成新的RDD |
distinct() | 去重 |
sample(withReplacement,<br />fraction, [seed]) | 对RDD进行采样,以及是否替换 |
val words = sc.parallelize(List("hello world", "spark", "hadoop", "spark sql", "spark streaming"))
// flatMap
val rdd1 = words.flatMap(x => x.split(" "))
// filter
val rdd2 = rdd1.filter(_.contains("spark"))
// sample
val rdd3 = rdd1.sample()
// distinct
val rdd4 = rdd1.distinct()
// map
val rdd5 = rdd2.map((_, 1))
结果:
scala> a.collect
res12: Array[Int] = Array(1, 2, 3)
scala> b.collect
res13: Array[Int] = Array(3, 4, 5)
scala> c.collect
res14: Array[Int] = Array(1, 2, 3, 3, 4, 5)
scala> d.collect
res15: Array[Int] = Array(3)
scala> e.collect
res16: Array[Int] = Array(1, 2)
scala> f.collect
res17: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
对两个RDD进行转换操作
函数名 | 描述 |
---|---|
union() | 生成一个包含两个RDD中所有元素的RDD |
intersection() | 求两个RDD共同元素的RDD |
substract() | 移除一个RDD中的内容 |
cartesian() | 与另一个RDD进行笛卡尔积 |
实战:
val a = sc.parallelize(List(1, 2, 3))
val b = sc.parallelize(List(3, 4, 5))
// union:生成一个包含两个RDD中所有元素的RDD
val c = a.union(b)
// intersection:求两个RDD共同的元素的RDD
val d = a.intersection(b)
// subtract:移除一个RDD中的内容
val e = a.subtract(b)
// cartesian:与另一个RDD的笛卡尔积
val f = a.cartesian(b)
结果:
scala> a.collect
res12: Array[Int] = Array(1, 2, 3)
scala> b.collect
res13: Array[Int] = Array(3, 4, 5)
scala> c.collect
res14: Array[Int] = Array(1, 2, 3, 3, 4, 5)
scala> d.collect
res15: Array[Int] = Array(3)
scala> e.collect
res16: Array[Int] = Array(1, 2)
scala> f.collect
res17: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
行动操作
对一个RDD进行行动操作
函数名 | 描述 |
---|---|
collect() | 返回RDD中的所有元素 |
count() | 返回RDD中的元素个数 |
countByValue() | 各元素在RDD中出现的次数 |
take(num) | 从RDD中返回num个元素 |
top(num) | 从RDD中返回最前面的num个元素 |
takeOrdered(num)(ordering) | 从RDD中按照顺序返回最前面的num个元素 |
takeSample(withReplacement,<br /> num, [seed]) | 从RDD中返回任意一些元素 |
reduce(func) | 并行整合RDD中所有数据 |
flod(zero)(func) | 和reduce一样,但是需要提供初始值 |
aggregate(zeroValue)(seqOp, comOp) | 和reduce一样,但是通常返回不同类型的函数 |
foreach(func) | 对RDD中的每个元素使用给定的函数 |
实战:
val rdd = sc.parallelize(List(1, 2, 3, 3))
// collect:返回RDD中的所有元素
rdd.collect()
// count:返回RDD中的元素个数
rdd.count()
// countByValue:返回个元素在RDD中出现的次数
rdd.countByValue()
// take:从RDD中返回2个元素
rdd.take(2)
// top:从RDD中返回最前面的2个元素
val x = rdd.top(2)
// takeOrdered:从RDD中按照提供的顺序返回最前面的2个元素
rdd.takeOrdered(2)
object Ord extends Ordering[Int] {
override def compare(x: Int, y: Int): Int = {
if (x < y) 1 else -1;
}
}
val pa = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
pa.takeOrdered(3)(Ord)
// foreach(func):对RDD中的每个元素使用给定的函数
rdd.foreach(println)
结果:
scala> val rdd = sc.parallelize(List(1, 2, 3, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> rdd.collect()
res18: Array[Int] = Array(1, 2, 3, 3)
scala> rdd.count()
res19: Long = 4
scala> rdd.countByValue()
res20: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
scala> rdd.take(2)
res21: Array[Int] = Array(1, 2)
scala> val x = rdd.top(2)
x: Array[Int] = Array(3, 3)
scala> rdd.takeOrdered(2)
res22: Array[Int] = Array(1, 2)
scala> object Ord extends Ordering[Int] {
| override def compare(x: Int, y: Int): Int = {
| if (x < y) 1 else -1;
| }
| }
defined object Ord
scala> val pa = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
pa: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:24
scala> pa.takeOrdered(3)(Ord)
res23: Array[Int] = Array(6, 5, 4)
scala> rdd.foreach(println)
2
1
3
3
对一个Pair RDD进行转化操作
函数名 | 描述 |
---|---|
reduceByKey(func) | 合并具有相同键的值 |
groupByKey() | 对具有相同键的值进行分组 |
combineByKey(createCombiner, <br />mergeValue, mergeCombiners, <br />numPartitions) | 使用不同的返回类型合并具有相同键的值 |
mapValues(func) | 对Pair RDD中的每个值应用一个函数而不改变建 |
flatMapValues(func) | 对Pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生辰给一个对应原键的键值对记录 |
keys() | 返回一个仅包含键的RDD |
values() | 返回一个仅包含值的RDD |
sortByKey() | 返回一个根据键排序的RDD |
实战:
val pairRDD = sc.parallelize(List((1, 2), (3, 4), (3, 6)))
val pairRDD1 = sc.parallelize(List((3, 5)))
// reduceByKey: 合并具有相同键的RDD
val rdd6 = pairRDD.reduceByKey((x, y) => x + y)
// groupByKey: 对具有相同键的值进行分组
val rdd7 = pairRDD.groupByKey()
// pairRDD.combineByKey()
val rdd8 = pairRDD.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
// keys: 返回一个仅包含键的RDD
val rdd9 = pairRDD.keys
// values: 返回一个仅包含值的RDD
var rdd10 = pairRDD.values
// sortByKey: 返回一个根据键排序的RDD
val rdd11 = pairRDD.sortByKey()
// subtract: 删除RDD中键与pairRDD1中键相同的元素
var rdd12 = pairRDD.subtract(pairRDD1)
结果:
scala> rdd6.collect
res25: Array[(Int, Int)] = Array((1,2), (3,10))
scala> rdd7.collect
res26: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2)), (3,CompactBuffer(4, 6)))
scala> rdd8.collect
res27: Array[(Int, (Int, Int))] = Array((1,(2,1)), (3,(10,2)))
scala> rdd9.collect
res28: Array[Int] = Array(1, 3, 3)
scala> rdd10.collect
res29: Array[Int] = Array(2, 4, 6)
scala> rdd11.collect
res30: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))
scala> rdd12.collect
res31: Array[(Int, Int)] = Array((3,6), (3,4), (1,2))
对两个Pair RDD进行转换操作
函数 | 描述 |
---|---|
subtractByKey | 删除RDD中键与other中的键相同的元素 |
join | 对两个rdd进行内连接 |
rightOuterJoin | 对两个RDD进行连接操作,确保第一个RDD的键必须存在 |
leftOuterJoin | 对两个RDD进行连接操作,确保第二个RDD的键必须存在 |
cogroup | 将两个RDD中拥有相同键的数据分组到一起 |
// join: 对两个RDD进行内连接
var rdd13 = pairRDD.join(pairRDD1)
// rightOuterJoin: 对两个RDD进行连接操作,确保第一个RDD的键必须存在
var rdd14 = pairRDD.rightOuterJoin(pairRDD1)
// leftOuterJoin: 对两个RDD进行连接操作,确保第二个RDD的键必须存在
var rdd15 = pairRDD.leftOuterJoin(pairRDD1)
// cogroup: 将两个RDD中拥有相同键的数据分组到一起
var rdd16 = pairRDD.cogroup(pairRDD1)
结果:
scala> rdd13.collect
res32: Array[(Int, (Int, Int))] = Array((3,(4,5)), (3,(6,5)))
scala> rdd14.collect
res33: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),5)), (3,(Some(6),5)))
scala> rdd15.collect
res34: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(5))), (3,(6,Some(5))))
scala> rdd16.collect
res35: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(4, 6),CompactBuffer(5))))
Pair RDD行动操作
函数名 | 描述 |
---|---|
countByKey() | 对每个键对应的元素分别计数 |
collectAsMap() | 将结果以映射表的形式返回,以便查询 |
lookup(key) | 返回给定键对应的所有值 |
实战:
// countByValue: 对每个键对应的元素分别计数
pairRDD.countByValue()
// collectAsMap: 将结果以映射表的形式返回,以便查询
pairRDD.collectAsMap()
// lookup: 返回指定键对应的所有值
pairRDD.lookup(3)
结果:
scala> pairRDD.countByValue()
res36: scala.collection.Map[(Int, Int),Long] = Map((3,6) -> 1, (3,4) -> 1, (1,2) -> 1)
scala> pairRDD.collectAsMap()
res37: scala.collection.Map[Int,Int] = Map(1 -> 2, 3 -> 6)
scala> pairRDD.lookup(3)
res38: Seq[Int] = WrappedArray(4, 6)
image
阅读过本文的同学还看了以下:
网友评论