美文网首页
Spark常用Actions算子

Spark常用Actions算子

作者: 数据萌新 | 来源:发表于2018-11-05 17:15 被阅读0次

    介绍以下Actions算子:
    foreach
    foreachPatition
    reduce
    collect
    count
    first
    take
    takeSample
    top
    takeOrdered
    saveAsTextFile
    saveAsSequenceFile
    saveAsObjectFile
    countByKey
    countByValue
    aggregate


    (1) foreach、foreachPatition

    • foreach:遍历RDD中的元素
    • foreachPatition:按照分区遍历RDD中的元素
        val arr = Array(1,2,3,4,5,6)
        val rdd = sc.makeRDD(arr,2)
        
        rdd.foreach(x => {
          println("===========")
          println(x)
        })
        /*
        ===========
        1
        ===========
        2
        ===========
        3
        ===========
        4
        ===========
        5
        ===========
        6
         */
        
        rdd.foreachPartition(x => {
          println("===========")
          while(x.hasNext) {
            println(x.next())
          }
        })
        /*
        ===========
        1
        2
        3
        ===========
        4
        5
        6
         */
        
        }
    

    (2) reduce:按照指定规则聚合RDD中的元素

    val numArr = Array(1,2,3,4,5)
    val rdd = sc.parallelize(numArr)
    val sum = rdd.reduce(_+_)
    println(sum)
    /*
    15
    */
    

    (3) collect:计算结果拉取回Driver端

    val numArr = Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3))
    val rdd = sc.parallelize(numArr)
    val sum = rdd.reduceByKey(_+_)
    
    sum.collect().foreach(println)
    /*
    (1,6)
    (2,6)
     */
    

    (4) count、countByKey、countByValue

    count:统计RDD中元素个数
    countByKey:统计每个Key中的元素的个数
    countByValue:统计每个value的个数

    // -- count
    val arr = Array("Tom","Jack","Tony","Bob","Kate")
    val rdd = sc.makeRDD(arr)
    println(rdd.count())
    /*
    5
     */
    
    // -- countByKey
    val rdd = sc.parallelize(Array(
            ("销售部","Tom"), ("销售部","Jack"),("销售部","Bob"),("销售部","Terry"),
            ("后勤部","Jack"),("后勤部","Selina"),("后勤部","Hebe"),
            ("人力部","Ella"),("人力部","Harry"),
            ("开发部","Allen")
        ))
    val result = rdd.countByKey();
    result.foreach(println)
    /*
    (后勤部,3)
    (开发部,1)
    (销售部,4)
    (人力部,2)
    
    // -- countByValue
    val rdd = sc.parallelize(Array(
          "Tom","Jed","Tom",
          "Tom","Jed","Jed",
          "Tom","Tony","Jed"
        ))
    val result = rdd.countByValue();
    result.foreach(println)
    /*
    (Tom,4)
    (Tony,1)
    (Jed,4)
    */
    

    (5) first、take、takeSample

    take(n):取RDD中前n条数据
    first:= take(1)
    takeSample(withReplacement,num,[seed]):随机抽取RDD中的元素

    withReplacement : 是否是放回式抽样  
        true代表如果抽中A元素,之后还可以抽取A元素
        false代表如果抽中了A元素,之后都不在抽取A元素  
    fraction : 抽样的比例  
    seed : 抽样算法的随机数种子,不同的数值代表不同的抽样规则,可以手动设置,默认为long的随机数
    
    
    val arr = Array(("Tom",88),("Bob",92),("Allen",86),("Kate",100),("Sandy",97))
    val rdd = sc.makeRDD(arr)
    
    // 排序后去前三个
    rdd.sortBy(_._2,false).take(3).foreach(println)
    /*
    (Kate,100)
    (Sandy,97)
    (Bob,92)
     */
    
    // 排序后取top1
    rdd.sortBy(_._2,false).take(1).foreach(println) // (Kate,100)
    println(rdd.sortBy(_._2,false).first()) // (Kate,100)
    
    // 随机抽取2个元素
    rdd.takeSample(false, 2).foreach(println)
    

    (6) top、takeOrdered

    top(n):从RDD中,按照默认(降序)或者指定的排序规则,返回前n个元素
    takeOrdered(n):从RDD中,按照默认(升序)或者指定的排序规则,返回前n个元素

    var rdd = sc.makeRDD(Array(10, 4, 2, 12, 3))
    
    rdd.top(3).foreach(println) // 12 10 4(降序取)
    
    rdd.takeOrdered(3).foreach(println) // 2 3 4(升序取)
    

    (7) saveAsTextFile、saveAsSequenceFile 、saveAsObjectFile

    • saveAsTextFile:把结果文件保存为textFile
    • saveAsSequenceFile:把结果文件保存为SequenceFile
    • saveAsObjectFile:把结果文件保存为ObjectFile
        val line = sc.textFile("hdfs://repo:9000/user/spark/wordcount/input/wordcount.txt")
        line.flatMap(_.split(" "))
          .map((_,1))
          .reduceByKey(_+_)
          .sortBy(_._2,false)
          // .foreach(t => println(t._1 + " " + t._2))
          .saveAsTextFile("hdfs://repo:9000/user/spark/wordcount/output/")
        ```
    
    

    相关文章

      网友评论

          本文标题:Spark常用Actions算子

          本文链接:https://www.haomeiwen.com/subject/quosxqtx.html