一、常用算子 (33)
spark算子大致上可分三大类算子:
1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。
2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。
3、Action算子,这类算子会触发SparkContext提交作业。
启动spark:
spark-shell --master local spark://master:7077 --executor-memory 1g --total-executor-cores 2
二、Value型Transformation算子--13种
1. map
map(一条对一条)
MAP (func):对rdd中数据进行计算后得到新RDD。将原数据的每个元素传给函数func进行格式化,返回一个新的分布式数据集。
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[39] at parallelize at <console>:25
// 求每个元素字符长度
scala> val b = a.map(_.length)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at map at <console>:26
// zip操作将这两个值连接在一起。构成一个元祖值。RDD的值的类型为元祖。
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[41] at zip at <console>:28
// 新集合
scala> c.collect
res3: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD,新的RDD叫MappedRDD
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> val b = a.map(_.length)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at map at <console>:25
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[11] at zip at <console>:27
scala> c.collect
res21: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
使用foreachPartition()可以显示分区数
2. flatMap
flatMap一条记录变n条(n>=0)
map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。
flatMap:对集合中每个元素进行操作然后再扁平化。
// 定义数组
scala> val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
// 输出
scala> arr.flatMap(x=>(x._1+x._2)).foreach(println)
[Stage 0:> (0 + 1) / 1]A
1
B
2
C
3
// 定义元组
scala> val a = sc.parallelize(1 to 10, 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
// 输出结果
scala> a.flatMap(1 to _).collect.foreach(print)
1 12 123 1234 12345 123456 12345671 2345678 123456789 12345678910
// 定义列表
scala> sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect.foreach(print)
111 222 333
parallelize
制作RDD的,是ParallelCollectionRDD,创建一个并行集合。
分区数量:就是将RDD切分多少个分区。这个分区数目每个CPU一般是2-4个在你的集群上。通常,spark会自动设置这个数量在你的集群上。你也可以手动去传参,这个函数的第二个参数,比如`sc.parallelize(data, 5)。
3. mapPartiions
类似与map,map作用于每个分区的每个元素,但mapPartitions作用于每个分区工 func的类型:Iterator[T] => Iterator[U] 假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,当在映射的过程中不断的创建对象时就可以使用mapPartitions比map的效率要高很多,比如当向数据库写入数据时,如果使用map就需要为每个元素创建connection对象,但使用mapPartitions的话就需要为每个分区创建connetcion对象
// 定义列表
scala> val list = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
list: List[(String, String)] = List((kpop,female), (zorro,male), (mobin,male), (lucy,female))
// 加载数据
scala> val rdd = sc.parallelize(list,2)
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[12] at parallelize at <console>:26
// 查找元组的第2个元素进行比较
scala> rdd.mapPartitions(x => x.filter(_._2 == "female")).foreachPartition(p=>{
| println(p.toList)
| println("====分区分割线====" )
| })
scala> val list = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
list: List[(String, String)] = List((kpop,female), (zorro,male), (mobin,male), (lucy,female))
scala> val rdd = sc.parallelize(list,2)
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.mapPartitions(x => x.filter(_._2 == "female")).foreachPartition(p=>{
| println("====分区分割线====" )
| println(p.toList)
| })
[Stage 0:> (0 + 1) / 2]
====分区分割线====
List((kpop,female))
====分区分割线====
List((lucy,female))
4. glom
将RDD的每个分区中的类型为T的元素转换换数组Array[T]
scala> val a = sc.parallelize(1 to 100, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> a.glom.collect
res0: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))
5. union
将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重。
scala> val a = sc.parallelize(1 to 3, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val b = sc.parallelize(5 to 7, 1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> a.union(b).collect()
res1: Array[Int] = Array(1, 2, 3, 5, 6, 7)
6. cartesian
对两个RDD中的所有元素进行笛卡尔积操作
scala> val x = sc.parallelize(List(1,2,3,4,5))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val y = sc.parallelize(List(6,7,8,9,10))
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> x.cartesian(y).collect
res2: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (4,7), (4,8), (4,9), (4,10), (5,6), (5,7), (5,8), (5,9), (5,10))
7. groupBy
生成相应的key,相同的放在一起 。
scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res3: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
8. filter
对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将过滤掉
scala> val a = sc.parallelize(1 to 10, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> val b = a.filter(_ % 2 == 0)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at filter at <console>:25
scala> b.collect
res4: Array[Int] = Array(2, 4, 6, 8, 10)
9. distinct
去重
scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at parallelize at <console>:24
scala> c.distinct.collect
res5: Array[String] = Array(Dog, Cat, Gnu, Rat)
10. subtract
去掉含有重复的项
scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24
scala> val b = sc.parallelize(1 to 3, 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> val c = a.subtract(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:27
scala> c.collect
res6: Array[Int] = Array(6, 9, 4, 7, 5, 8)
11. sample
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
scala> val a = sc.parallelize(1 to 10000, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> a.sample(false, 0.1, 0).count
res7: Long = 1032
12.takesample
takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组。
scala> val x = sc.parallelize(1 to 1000, 3)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:24
// 取样
scala> x.takeSample(true, 100, 1)
res8: Array[Int] = Array(764, 815, 274, 452, 39, 538, 238, 544, 475, 480, 416, 868, 517, 363, 39, 316, 37, 90, 210, 202, 335, 773, 572, 243, 354, 305, 584, 820, 528, 749, 188, 366, 913, 667, 214, 540, 807, 738, 204, 968, 39, 863, 541, 703, 397, 489, 172, 29, 211, 542, 600, 977, 941, 923, 900, 485, 575, 650, 258, 31, 737, 155, 685, 562, 223, 675, 330, 864, 291, 536, 392, 108, 188, 408, 475, 565, 873, 504, 34, 343, 79, 493, 868, 974, 973, 110, 587, 457, 739, 745, 977, 800, 783, 59, 276, 987, 160, 351, 515, 901)
13. cache、persist
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。
scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> c.getStorageLevel
res9: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
scala> c.cache
res10: c.type = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> c.getStorageLevel
res11: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)
- sortBy 排序
sortBy函数是在org.apache.spark.rdd.RDD类中实现的。
该函数最多可以传三个参数:
第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。
scala> val sss=sc.parallelize(List(3,4,10,5,6,9))
sss: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
// true为升序 false为降序
scala> val ddd=sss.sortBy(x=>x,false)
scala> ddd.foreach(println)
10
9
6
5
4
3
三、Key-Value型Transformation算子--8种
1. mapValues
mapValues是针对[K,V]中的V值进行map操作
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
2. combineByKey
使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
3. reduceByKey
对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
4. partitionBy
5. cogroup
cogroup:对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
6. join
对两个需要连接的RDD进行cogroup函数操作
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.join(d).collect
7. leftOutJoin
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.leftOuterJoin(d).collect
8. rightOuterJoin
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect
9. sortByKey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。
// 加载数据
scala> val a = sc.parallelize(List(("tenglangpu",50),("zhangsan",20),("lishi",18),("wangwu",19)), 2)
a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:24
// 数据输出
scala> a.foreach(println)
(tenglangpu,50)
(zhangsan,20)
(lishi,18)
(wangwu,19)
// 升序
scala> val s=a.sortByKey().collect
s: Array[(String, Int)] = Array((lishi,18), (tenglangpu,50), (wangwu,19), (zhangsan,20))
scala> s.foreach(println)
(lishi,18)
(tenglangpu,50)
(wangwu,19)
(zhangsan,20)
// 降序
scala> val s=a.sortByKey(false).collect
s: Array[(String, Int)] = Array((zhangsan,20), (wangwu,19), (tenglangpu,50), (lishi,18))
scala> s.foreach(println)
(zhangsan,20)
(wangwu,19)
(tenglangpu,50)
(lishi,18)
附:
SparkAPI文档:http://spark.apache.org/docs/2.2.0/api/scala/index.html
网友评论