美文网首页
Spark常用Transformations算子(二)

Spark常用Transformations算子(二)

作者: 数据萌新 | 来源:发表于2018-11-05 17:15 被阅读0次

    介绍以下Transformations算子:
    join
    cogroup
    cartesian
    pipe
    repartitionAndSortWithinPartitions
    glom
    randomSplit
    zip
    zipWithIndex
    zipWithUniqueId


    (2) join

    object JoinTest {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("MapTest").setMaster("local")
        val sc = new SparkContext(conf)
    
        val nameList = List(
          (1,"Jed"),
          (2,"Tom"),
          (3,"Bob"),
          (4,"Tony")
        )
    
        val salaryArr = Array(
          (1,8000),
          (2,6000),
          (3,5000)
        )
    
        val nameRDD = sc.parallelize(nameList,2)
        val salaryRDD = sc.parallelize(salaryArr,3)
    
        // inner join
        val joinRDD = nameRDD.join(salaryRDD)
        joinRDD.foreach( x => {
          val id = x._1
          val name = x._2._1
          val salary = x._2._2
          println(id + "\t" + name + "\t" + salary)
        })
        /*
        1    Jed  8000
        2    Tom  6000
        3    Bob  5000
        */
    
        // left join
        val leftOuterJoinRDD = nameRDD.leftOuterJoin(salaryRDD)
        leftOuterJoinRDD.foreach( x => {
          val id = x._1
          val name = x._2._1
          val salary = x._2._2
          println(id + "\t" + name + "\t" + salary)
        })
        /*
        1    Jed  Some(8000)
        2    Tom  Some(6000)
        3    Bob  Some(5000)
        4    Tony None
        */
    
        // right join
        val rightOuterJoinRDD = nameRDD.rightOuterJoin(salaryRDD)
        rightOuterJoinRDD.foreach( x => {
          val id = x._1
          val name = x._2._1
          val salary = x._2._2
          println(id + "\t" + name + "\t" + salary)
        })
        /*
        1    Some(Jed)    8000
        2    Some(Tom)    6000
        3    Some(Bob)    5000
        */
    
        // full join
        val fullOuterJoinRDD = nameRDD.fullOuterJoin(salaryRDD)
        fullOuterJoinRDD.foreach( x => {
          val id = x._1
          val name = x._2._1
          val salary = x._2._2
          println(id + "\t" + name + "\t" + salary)
        })
        /*
          1    Some(Jed)     Some(8000)
          2    Some(Tom)     Some(6000)
          3    Some(Bob)     Some(5000)
          4    Some(Tony)    None
        */
      }
    }
    
    

    (3) cogroup:将多个RDD中同一个Key对应的Value组合到一起

    val data1 = sc.parallelize(List((1, "Good"), (2, "Morning")))
    val data2 = sc.parallelize(List((1, "How"), (2, "Are"), (3, "You")))
    val data3 = sc.parallelize(List((1, "I"), (2, "Love"), (3, "U")))
    
    val result = data1.cogroup(data2, data3)
    
    result.foreach(println)
    
    val data1 = sc.parallelize(List((1, "Good"), (2, "Morning")))
    val data2 = sc.parallelize(List((1, "How"), (2, "Are"), (3, "You")))
    val data3 = sc.parallelize(List((1, "I"), (2, "Love"), (3, "U")))
    
    val result = data1.cogroup(data2, data3)
    
    result.foreach(println)
    
    /*
    (1,(CompactBuffer(Good),CompactBuffer(How),CompactBuffer(I)))
    (2,(CompactBuffer(Morning),CompactBuffer(Are),CompactBuffer(Love)))
    (3,(CompactBuffer(),CompactBuffer(You),CompactBuffer(U)))
     */
    
    

    (4) cartesian:求笛卡尔积

    val rdd1 = sc.makeRDD(Array(1,2,3))
    val rdd2 = sc.makeRDD(Array(4,5,6))
    rdd1.cartesian(rdd2).foreach(println)
    
    /*
    (1,4)
    (1,5)
    (1,6)
    (2,4)
    (2,5)
    (2,6)
    (3,4)
    (3,5)
    (3,6)
     */
    
    

    (6) repartitionAndSortWithinPartitions:重新分区并按照新分区排序

    val arr = Array((1,"Tom"),(18,"Tony"),(23,"Ted"),
            (3,"Harry"),(56,"Bob"),(45,"Jack"),
            (22,"Jed"),(2,"Kobe"),(4,"Kate"),
            (23,"Mary"),(32,"Tracy"),(6,"Allen"),
            (7,"Caleb"),(19,"Alexande"),(9,"Nathan"))
    
    val rdd = sc.makeRDD(arr,2)
    
    rdd.foreachPartition(x => {
      println("=============")
      while(x.hasNext) {
        println(x.next())
      }
    })
    /*
    =============
    (1,Tom)
    (18,Tony)
    (23,Ted)
    (3,Harry)
    (56,Bob)
    (45,Jack)
    (22,Jed)
    =============
    (2,Kobe)
    (4,Kate)
    (23,Mary)
    (32,Tracy)
    (6,Allen)
    (7,Caleb)
    (19,Alexande)
    (9,Nathan)
    */
    
    // 改变为4个分区
    rdd.repartitionAndSortWithinPartitions(new HashPartitioner(4))
      .foreachPartition(x => {
        println("=============")
        while(x.hasNext) {
          println(x.next())
        }
      })
    /*
    =============
    (4,Kate)
    (32,Tracy)
    (56,Bob)
    =============
    (1,Tom)
    (9,Nathan)
    (45,Jack)
    =============
    (2,Kobe)
    (6,Allen)
    (18,Tony)
    (22,Jed)
    =============
    (3,Harry)
    (7,Caleb)
    (19,Alexande)
    (23,Ted)
    (23,Mary)
    */
    
    

    (7) glom:把分区中的元素封装到数组中

    val rdd = sc.parallelize(1 to 10,2)
    
    val glomRDD = rdd.glom()
    
    glomRDD.foreach(x => {
      println("============")
      x.foreach(println)
    })
    println("glomRDD中的元素个数为:" + glomRDD.count())
    
    /*
    ============
    1
    2
    3
    4
    5
    ============
    6
    7
    8
    9
    10
    glomRDD中的元素个数为:2
    */
    
    

    (8) randomSplit:拆分RDD

    val rdd = sc.parallelize(1 to 10)
    // 把原来的RDD按照1:2:3:4的比例拆分为4个RDD
    rdd.randomSplit(Array(0.1,0.2,0.3,0.4)).foreach(x => {println(x.count)})
    
    理论结果:
    1
    2
    3
    4
    在数据量不大的情况下,实际结果不一定准确
    
    

    (9) zip、zipWithIndex、zipWithUniqueId

    package com.aura.transformations
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Author: Jed
      * Description:
      * Date: Create in 2018/1/11
      */
    object ZipTest {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("MapTest").setMaster("local")
        val sc = new SparkContext(conf)
    
        val arr = Array(1,2,3,4,5)
        val arr2 = Array("Tom","Jed","Tony","Terry","Kate")
    
        val rdd1 = sc.makeRDD(arr)
        val rdd2 = sc.makeRDD(arr2)
    
        rdd1.zip(rdd2).foreach(println)
        /*
        (1,Tom)
        (2,Jed)
        (3,Tony)
        (4,Terry)
        (5,Kate)
        */
    
        rdd2.zipWithIndex().foreach(println)
        /*
        (Tom,0)
        (Jed,1)
        (Tony,2)
        (Terry,3)
        (Kate,4)
        */
    
        rdd1.zipWithUniqueId().foreach(println)
        /*
        (1,0)
        (2,1)
        (3,2)
        (4,3)
        (5,4)
        */
    
      }
    
    }
    
    

    原理:

    image.png

    相关文章

      网友评论

          本文标题:Spark常用Transformations算子(二)

          本文链接:https://www.haomeiwen.com/subject/uyosxqtx.html