美文网首页
Spark开发--RDD编程--常用算子--Actions(九)

Spark开发--RDD编程--常用算子--Actions(九)

作者: 无剑_君 | 来源:发表于2019-12-16 16:27 被阅读0次

    一、Actions算子-- 12种

    启动spark:

    spark-shell --master local spark://master:7077 --executor-memory 1g --total-executor-cores 2
    

    1. foreach

    打印输出

    scala> val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
    c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
    // 输出结果
    scala> c.foreach(x => println(x + "--》Ok"))
    cat--》Ok
    dog--》Ok
    tiger--》Ok
    lion--》Ok
    gnu--》Ok
    crocodile--》Ok
    ant--》Ok
    whale--》Ok
    dolphin--》Ok
    spider--》Ok
    
    

    2. saveAsTextFile

    保存结果到HDFS

    scala> val a = sc.parallelize(1 to 10000, 3)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
    // 保存结果到hdfs的/usr/data_a路径
    scala> a.saveAsTextFile("/user/data_a")
    
    

    3. saveAsObjectFile

    saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
    对于HDFS,默认采用SequenceFile保存。

    scala> val x = sc.parallelize(1 to 100, 3)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    // 保存结果到文件
    scala> x.saveAsObjectFile("/user/data/objFile")
    
    val y = sc.objectFile[Int]("/data/data/objFile")
    scala> val y = sc.objectFile[Int]("/user/data/objFile")
    y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at objectFile at <console>:24
    // 获取结果
    scala> y.collect
    res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
    
    
    

    4. collect

    将RDD中的数据收集起来,变成一个Array,仅限数据量比较小的时候。

    scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
    c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
    // 输出集合结果
    scala> c.collect
    res6: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
    
    

    5. collectAsMap

    返回hashMap包含所有RDD中的分片,key如果重复,后边的元素会覆盖前面的元素。

    scala> val a = sc.parallelize(List(1, 2, 1, 3), 1)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
    
    scala> val b = a.zip(a)
    b: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[12] at zip at <console>:25
    // 返回hashmap结果
    scala> b.collectAsMap
    res7: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)
    
    

    6. reduceByKeyLocally

    先执行reduce然后在执行collectAsMap

    scala> val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at parallelize at <console>:24
    
    scala> val b = a.map(x => (x.length, x))
    b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[14] at map at <console>:27
    // 执行reduce
    scala> b.reduceByKey(_ + _).collect
    res8: Array[(Int, String)] = Array((3,dogcatowlgnuant))
    

    7.lookup

    查找,针对key-value类型的RDD

    scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:24
    
    scala> val b = a.map(x => (x.length, x))
    b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[17] at map at <console>:27
    // 查找key-value的RDD
    scala> b.lookup(5)
    res9: Seq[String] = WrappedArray(tiger, eagle)
    
    

    8.count

    总数

    scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
    c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:24
    // 统计数量
    scala> c.count
    res10: Long = 4
    
    

    9. top

    返回最大的K个元素

    scala> val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
    c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
    // 找出最大的两个
    scala> c.top(2)
    res11: Array[Int] = Array(9, 8)
    
    

    10.reduce

    相当于对RDD中的元素进行reduceLeft函数的操作

    scala> val a = sc.parallelize(1 to 100, 3)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
    // 累加
    scala> a.reduce(_ + _)
    res12: Int = 5050
    
    

    scala:

    scala> val a = Array(20, 12, 6, 15, 2, 9)
    a: Array[Int] = Array(20, 12, 6, 15, 2, 9)
    //  数组求和
    scala> a.reduceLeft(_ + _) 
    res4: Int = 64
    // 数组求乘积
    scala>  a.reduceLeft(_ * _)
    res5: Int = 388800
    // 数组求最小值
    scala>  a.reduceLeft(_ min _)
    res6: Int = 2
    // 数组求最大值
    scala> a.reduceLeft(_ max _)
    res7: Int = 20
    
    

    11. fold

    fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。

    scala> val a = sc.parallelize(List(1,2,3), 3)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
    // 累加
    scala> a.fold(0)(_ + _)
    res13: Int = 6
    
    

    12. aggregate

    aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果聚合fold操作。
    aggregate 方法是一个聚合函数,接受多个输入,并按照一定的规则运算以后输出一个结果值。

    scala> val z = sc.parallelize(List(1,2,3,4,5,6), 2)
    
    z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
    // 函数定义 转换为list数据,将将数据保存为map
    scala> def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
         |  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
         | }
    myfunc: (index: Int, iter: Iterator[Int])Iterator[String]
    
    // 让我们首先用分区标签打印出RDD的内容
    scala> z.mapPartitionsWithIndex(myfunc).collect
    res14: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])
    
    // 聚合结果 0为初始值
    // 第一个参数:函数为找出z中每个分区最大值,第二个函数 _ + _ 两个最大值结果进行相加
    scala> z.aggregate(0)(math.max(_, _), _ + _)
    res15: Int = 9
    
    

    附:
    SparkAPI文档:http://spark.apache.org/docs/2.2.0/api/scala/index.html

    RDD文档

    相关文章

      网友评论

          本文标题:Spark开发--RDD编程--常用算子--Actions(九)

          本文链接:https://www.haomeiwen.com/subject/nokxnctx.html