Spark RDD Api使用指南

作者: digger30 | 来源:发表于2019-06-08 12:09 被阅读2次
img

​ 在Spark快速入门-RDD文章中学了spark的RDD。spark包含转换行动操作。在进行spark程序开发的过程中,不可避免的要与spark rdd的转换和行动操作打交道。本文会介绍spark rdd常用的转换和行动操作。

转换操作

对一个RDD进行转换操作

函数名 描述
filter() 返回一个由通过传给filter()的函数的元素组成的RDD
flatMap() 将函数应用于RDD的每个元素,将返回的迭代器的所有内容构成新的RDD
map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD
distinct() 去重
sample(withReplacement,<br />fraction, [seed]) 对RDD进行采样,以及是否替换
val words = sc.parallelize(List("hello world", "spark", "hadoop", "spark sql", "spark streaming"))

// flatMap
val rdd1 = words.flatMap(x => x.split(" "))

// filter
val rdd2 = rdd1.filter(_.contains("spark"))

// sample
val rdd3 = rdd1.sample()

// distinct
val rdd4 = rdd1.distinct()

// map
val rdd5 = rdd2.map((_, 1))

结果:

scala> a.collect
res12: Array[Int] = Array(1, 2, 3)

scala> b.collect
res13: Array[Int] = Array(3, 4, 5)

scala> c.collect
res14: Array[Int] = Array(1, 2, 3, 3, 4, 5)

scala> d.collect
res15: Array[Int] = Array(3)

scala> e.collect
res16: Array[Int] = Array(1, 2)

scala> f.collect
res17: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))

对两个RDD进行转换操作

函数名 描述
union() 生成一个包含两个RDD中所有元素的RDD
intersection() 求两个RDD共同元素的RDD
substract() 移除一个RDD中的内容
cartesian() 与另一个RDD进行笛卡尔积

实战:

val a = sc.parallelize(List(1, 2, 3))
val b = sc.parallelize(List(3, 4, 5))

// union:生成一个包含两个RDD中所有元素的RDD
val c = a.union(b)

// intersection:求两个RDD共同的元素的RDD
val d = a.intersection(b)

// subtract:移除一个RDD中的内容
val e = a.subtract(b)

// cartesian:与另一个RDD的笛卡尔积
val f = a.cartesian(b)

结果:

scala> a.collect
res12: Array[Int] = Array(1, 2, 3)

scala> b.collect
res13: Array[Int] = Array(3, 4, 5)

scala> c.collect
res14: Array[Int] = Array(1, 2, 3, 3, 4, 5)

scala> d.collect
res15: Array[Int] = Array(3)

scala> e.collect
res16: Array[Int] = Array(1, 2)

scala> f.collect
res17: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))

行动操作

对一个RDD进行行动操作

函数名 描述
collect() 返回RDD中的所有元素
count() 返回RDD中的元素个数
countByValue() 各元素在RDD中出现的次数
take(num) 从RDD中返回num个元素
top(num) 从RDD中返回最前面的num个元素
takeOrdered(num)(ordering) 从RDD中按照顺序返回最前面的num个元素
takeSample(withReplacement,<br /> num, [seed]) 从RDD中返回任意一些元素
reduce(func) 并行整合RDD中所有数据
flod(zero)(func) 和reduce一样,但是需要提供初始值
aggregate(zeroValue)(seqOp, comOp) 和reduce一样,但是通常返回不同类型的函数
foreach(func) 对RDD中的每个元素使用给定的函数

实战:

val rdd = sc.parallelize(List(1, 2, 3, 3))

// collect:返回RDD中的所有元素
rdd.collect()

// count:返回RDD中的元素个数
rdd.count()

// countByValue:返回个元素在RDD中出现的次数
rdd.countByValue()

// take:从RDD中返回2个元素
rdd.take(2)

// top:从RDD中返回最前面的2个元素
val x = rdd.top(2)

// takeOrdered:从RDD中按照提供的顺序返回最前面的2个元素
rdd.takeOrdered(2)

object Ord extends Ordering[Int] {
    override def compare(x: Int, y: Int): Int = {
        if (x < y) 1 else -1;
    }
}
val pa = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
pa.takeOrdered(3)(Ord)

// foreach(func):对RDD中的每个元素使用给定的函数
rdd.foreach(println)

结果:

scala> val rdd = sc.parallelize(List(1, 2, 3, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> rdd.collect()
res18: Array[Int] = Array(1, 2, 3, 3)

scala> rdd.count()
res19: Long = 4

scala> rdd.countByValue()
res20: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

scala> rdd.take(2)
res21: Array[Int] = Array(1, 2)

scala> val x = rdd.top(2)
x: Array[Int] = Array(3, 3)

scala> rdd.takeOrdered(2)
res22: Array[Int] = Array(1, 2)

scala> object Ord extends Ordering[Int] {
     |     override def compare(x: Int, y: Int): Int = {
     |         if (x < y) 1 else -1;
     |     }
     | }
defined object Ord

scala> val pa = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
pa: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:24

scala> pa.takeOrdered(3)(Ord)
res23: Array[Int] = Array(6, 5, 4)

scala> rdd.foreach(println)
2
1
3
3

对一个Pair RDD进行转化操作

函数名 描述
reduceByKey(func) 合并具有相同键的值
groupByKey() 对具有相同键的值进行分组
combineByKey(createCombiner, <br />mergeValue, mergeCombiners, <br />numPartitions) 使用不同的返回类型合并具有相同键的值
mapValues(func) 对Pair RDD中的每个值应用一个函数而不改变建
flatMapValues(func) 对Pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生辰给一个对应原键的键值对记录
keys() 返回一个仅包含键的RDD
values() 返回一个仅包含值的RDD
sortByKey() 返回一个根据键排序的RDD

实战:

val pairRDD = sc.parallelize(List((1, 2), (3, 4), (3, 6)))
val pairRDD1 = sc.parallelize(List((3, 5)))

// reduceByKey: 合并具有相同键的RDD
val rdd6 = pairRDD.reduceByKey((x, y) => x + y)

// groupByKey: 对具有相同键的值进行分组
val rdd7 = pairRDD.groupByKey()

//    pairRDD.combineByKey()
val rdd8 = pairRDD.combineByKey(
    (v) => (v, 1),
    (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

// keys: 返回一个仅包含键的RDD
val rdd9 = pairRDD.keys

// values: 返回一个仅包含值的RDD
var rdd10 = pairRDD.values

// sortByKey: 返回一个根据键排序的RDD
val rdd11 = pairRDD.sortByKey()

// subtract: 删除RDD中键与pairRDD1中键相同的元素
var rdd12 = pairRDD.subtract(pairRDD1)

结果:

scala> rdd6.collect
res25: Array[(Int, Int)] = Array((1,2), (3,10))

scala> rdd7.collect
res26: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2)), (3,CompactBuffer(4, 6)))

scala> rdd8.collect
res27: Array[(Int, (Int, Int))] = Array((1,(2,1)), (3,(10,2)))

scala> rdd9.collect
res28: Array[Int] = Array(1, 3, 3)

scala> rdd10.collect
res29: Array[Int] = Array(2, 4, 6)

scala> rdd11.collect
res30: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))

scala> rdd12.collect
res31: Array[(Int, Int)] = Array((3,6), (3,4), (1,2))

对两个Pair RDD进行转换操作

函数 描述
subtractByKey 删除RDD中键与other中的键相同的元素
join 对两个rdd进行内连接
rightOuterJoin 对两个RDD进行连接操作,确保第一个RDD的键必须存在
leftOuterJoin 对两个RDD进行连接操作,确保第二个RDD的键必须存在
cogroup 将两个RDD中拥有相同键的数据分组到一起
// join: 对两个RDD进行内连接
var rdd13 = pairRDD.join(pairRDD1)

// rightOuterJoin: 对两个RDD进行连接操作,确保第一个RDD的键必须存在
var rdd14 = pairRDD.rightOuterJoin(pairRDD1)

// leftOuterJoin: 对两个RDD进行连接操作,确保第二个RDD的键必须存在
var rdd15 = pairRDD.leftOuterJoin(pairRDD1)

// cogroup: 将两个RDD中拥有相同键的数据分组到一起
var rdd16 = pairRDD.cogroup(pairRDD1)

结果:

scala> rdd13.collect
res32: Array[(Int, (Int, Int))] = Array((3,(4,5)), (3,(6,5)))

scala> rdd14.collect
res33: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),5)), (3,(Some(6),5)))

scala> rdd15.collect
res34: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(5))), (3,(6,Some(5))))

scala> rdd16.collect
res35: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(4, 6),CompactBuffer(5))))

Pair RDD行动操作

函数名 描述
countByKey() 对每个键对应的元素分别计数
collectAsMap() 将结果以映射表的形式返回,以便查询
lookup(key) 返回给定键对应的所有值

实战:

// countByValue: 对每个键对应的元素分别计数
pairRDD.countByValue()

// collectAsMap: 将结果以映射表的形式返回,以便查询
pairRDD.collectAsMap()

// lookup: 返回指定键对应的所有值
pairRDD.lookup(3)

结果:

scala> pairRDD.countByValue()
res36: scala.collection.Map[(Int, Int),Long] = Map((3,6) -> 1, (3,4) -> 1, (1,2) -> 1)

scala> pairRDD.collectAsMap()
res37: scala.collection.Map[Int,Int] = Map(1 -> 2, 3 -> 6)

scala> pairRDD.lookup(3)
res38: Seq[Int] = WrappedArray(4, 6)

点击查看源码

image

阅读过本文的同学还看了以下:

相关文章

网友评论

    本文标题:Spark RDD Api使用指南

    本文链接:https://www.haomeiwen.com/subject/iqhixctx.html