一、Actions算子-- 12种
启动spark:
spark-shell --master local spark://master:7077 --executor-memory 1g --total-executor-cores 2
1. foreach
打印输出
scala> val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
// 输出结果
scala> c.foreach(x => println(x + "--》Ok"))
cat--》Ok
dog--》Ok
tiger--》Ok
lion--》Ok
gnu--》Ok
crocodile--》Ok
ant--》Ok
whale--》Ok
dolphin--》Ok
spider--》Ok
2. saveAsTextFile
保存结果到HDFS
scala> val a = sc.parallelize(1 to 10000, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
// 保存结果到hdfs的/usr/data_a路径
scala> a.saveAsTextFile("/user/data_a")
3. saveAsObjectFile
saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
对于HDFS,默认采用SequenceFile保存。
scala> val x = sc.parallelize(1 to 100, 3)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
// 保存结果到文件
scala> x.saveAsObjectFile("/user/data/objFile")
val y = sc.objectFile[Int]("/data/data/objFile")
scala> val y = sc.objectFile[Int]("/user/data/objFile")
y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at objectFile at <console>:24
// 获取结果
scala> y.collect
res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
4. collect
将RDD中的数据收集起来,变成一个Array,仅限数据量比较小的时候。
scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
// 输出集合结果
scala> c.collect
res6: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
5. collectAsMap
返回hashMap包含所有RDD中的分片,key如果重复,后边的元素会覆盖前面的元素。
scala> val a = sc.parallelize(List(1, 2, 1, 3), 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> val b = a.zip(a)
b: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[12] at zip at <console>:25
// 返回hashmap结果
scala> b.collectAsMap
res7: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)
6. reduceByKeyLocally
先执行reduce然后在执行collectAsMap
scala> val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at parallelize at <console>:24
scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[14] at map at <console>:27
// 执行reduce
scala> b.reduceByKey(_ + _).collect
res8: Array[(Int, String)] = Array((3,dogcatowlgnuant))
7.lookup
查找,针对key-value类型的RDD
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[17] at map at <console>:27
// 查找key-value的RDD
scala> b.lookup(5)
res9: Seq[String] = WrappedArray(tiger, eagle)
8.count
总数
scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:24
// 统计数量
scala> c.count
res10: Long = 4
9. top
返回最大的K个元素
scala> val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
// 找出最大的两个
scala> c.top(2)
res11: Array[Int] = Array(9, 8)
10.reduce
相当于对RDD中的元素进行reduceLeft函数的操作
scala> val a = sc.parallelize(1 to 100, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
// 累加
scala> a.reduce(_ + _)
res12: Int = 5050
scala:
scala> val a = Array(20, 12, 6, 15, 2, 9)
a: Array[Int] = Array(20, 12, 6, 15, 2, 9)
// 数组求和
scala> a.reduceLeft(_ + _)
res4: Int = 64
// 数组求乘积
scala> a.reduceLeft(_ * _)
res5: Int = 388800
// 数组求最小值
scala> a.reduceLeft(_ min _)
res6: Int = 2
// 数组求最大值
scala> a.reduceLeft(_ max _)
res7: Int = 20
11. fold
fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。
scala> val a = sc.parallelize(List(1,2,3), 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
// 累加
scala> a.fold(0)(_ + _)
res13: Int = 6
12. aggregate
aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果聚合fold操作。
aggregate 方法是一个聚合函数,接受多个输入,并按照一定的规则运算以后输出一个结果值。
scala> val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
// 函数定义 转换为list数据,将将数据保存为map
scala> def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
| iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
| }
myfunc: (index: Int, iter: Iterator[Int])Iterator[String]
// 让我们首先用分区标签打印出RDD的内容
scala> z.mapPartitionsWithIndex(myfunc).collect
res14: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])
// 聚合结果 0为初始值
// 第一个参数:函数为找出z中每个分区最大值,第二个函数 _ + _ 两个最大值结果进行相加
scala> z.aggregate(0)(math.max(_, _), _ + _)
res15: Int = 9
附:
SparkAPI文档:http://spark.apache.org/docs/2.2.0/api/scala/index.html
网友评论