美文网首页Bigdata
Spark 常用算子及代码

Spark 常用算子及代码

作者: coderrrrrrrrr | 来源:发表于2020-04-02 17:39 被阅读0次

    sc.parallelize 和 sc.markRDD

    parallelize()源码

    def parallelize[T: ClassTag](  
          seq: Seq[T],  
          numSlices: Int = defaultParallelism): RDD[T] = withScope {  
        assertNotStopped()  
        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())  
      }  
    

    makeRDD(),有两种重构方法

    /** Distribute a local Scala collection to form an RDD.  
       *  
       * This method is identical to `parallelize`.  
       */  
      def makeRDD[T: ClassTag](  
          seq: Seq[T],  
          numSlices: Int = defaultParallelism): RDD[T] = withScope {  
        parallelize(seq, numSlices)  
      }  
    
    /**  
       * Distribute a local Scala collection to form an RDD, with one or more  
       * location preferences (hostnames of Spark nodes) for each object.  
       * Create a new partition for each collection item.  
       */  
      def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {  
        assertNotStopped()  
        val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap  
        new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)  
      }  
    

    注释的意思为:分配一个本地Scala集合形成一个RDD,为每个集合对象创建一个最佳分区

    测试使用

    object MyTask2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("rdd maker").setMaster("local")
        val sc = new SparkContext(conf)
        val list = List(("A",List("a1","a2","a3")),("B",List("b1","b2","b3"),("C",List("c1","c2","c3"))))
        val rddmaker = sc.makeRDD(list)
        val rddP = sc.parallelize(list)
    
        println("rddmaker partitions size:",rddmaker.partitions.size)
        println("rddP partitions size:",rddP.partitions.size)
      }
    
    }
    //(rddmaker partitions size:,1)
    //(rddP partitions size:,1)
    

    distinct

    代码

    object MyTask3 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("task3"))
        println("rdd partitions size is ",rdd.partitions.size)
        val rdd = sc.parallelize(List("a","b","c","b","b","a"))
        rdd.distinct().collect().foreach(print(_))
      }
    }
    //(rdd partitions size is ,1)
    //abc
    

    union

    代码

    object MyTask4 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task4"))
        val rddLeft = sc.parallelize(List("2","3","4","5"))
        val rddRight = sc.parallelize(List("1","3","5","7"))
        val rddUnion = rddLeft.union(rddRight)
        rddUnion.collect().foreach(item => print(item + ","))
      }
    }
    //2,3,4,5,1,3,5,7,
    

    intersection 求交集

    object MyTask5 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task4"))
        val rddLeft = sc.parallelize(List("2","3","4","5"))
        val rddRight = sc.parallelize(List("1","3","5","7"))
        val rddIntersec = rddLeft.intersection(rddRight)
        rddIntersec.collect().foreach(item => print(item + ","))
      }
    }
    //5,3,
    

    subtract 把Rdd中的与另一个Rdd相同的元素去掉

    代码

    object MyTask6 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task6"))
        val rddLeft = sc.parallelize(List("2","3","4","5"))
        val rddRight = sc.parallelize(List("1","3","5","7"))
        val rddSubtract = rddLeft.subtract(rddRight)
        rddSubtract.collect().foreach(item => print(item + ","))
      }
    }
    //2,4,
    

    cartesian 笛卡尔积

    代码

    object MyTask7 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task7"))
        val rddLeft = sc.parallelize(List("2","3","4","5"))
        val rddRight = sc.parallelize(List("1","3","5","7"))
        val rddCartesian = rddLeft.cartesian(rddRight)
        rddCartesian.collect().foreach(item => print(item + ","))
      }
    }
    //(2,1),(2,3),(2,5),(2,7),(3,1),(3,3),(3,5),(3,7),(4,1),(4,3),(4,5),(4,7),(5,1),(5,3),(5,5),(5,7)
    

    countByValue 求出value出现的次数

    代码

    object MyTask8 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task8"))
        val rddLeft = sc.parallelize(List("2","3","4","5"))
        val rddRight = sc.parallelize(List("1","3","5","7"))
        val rddUnion = rddLeft.union(rddRight)
        val rddCountByValue:scala.collection.Map[String, scala.Long] = rddUnion.countByValue
        rddCountByValue.foreach(item => println(item._1 + "," + item._2))
      }
    }
    /*
    4,1
    5,2
    1,1
    2,1
    7,1
    3,2
    */
    

    reduce 并行计算出函数

    代码

    object MyTask9 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task9"))
        val rdd = sc.parallelize(1 to 11)
        val result = rdd.reduce((x,y) => x+y)
        println(result)
      }
    }
    //66
    

    fold

    代码

    object MyTask10 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task10"))
        val rdd = sc.parallelize(1 to 11,2)
        val result = rdd.fold(10)(_+_)
        println(result)
      }
    }
    //96
    

    解释,与reduce类似,只是多了一个初始值。

    aggregate

    函数签名

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    

    解释:
    aggregate先对每个分区的元素做聚集,然后对所有分区的结果做聚集,聚集过程中,使用的是给定的聚集函数以及初始值”zero value”。这个函数能返回一个与原始RDD不同的类型U,因此,需要一个合并RDD类型T到结果类型U的函数,还需要一个合并类型U的函数。这两个函数都可以修改和返回他们的第一个参数,而不是重新新建一个U类型的参数以避免重新分配内存。
    参数zeroValue:seqOp运算符的每个分区的累积结果的初始值以及combOp运算符的不同分区的组合结果的初始值 - 这通常将是初始元素(例如“Nil”表的列表 连接或“0”表示求和)
    参数seqOp: 每个分区累积结果的聚集函数。
    参数combOp: 一个关联运算符用于组合不同分区的结果

    代码

    object MyTask11 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task11"))
        val rdd = sc.parallelize(1 to 4,3)
        val result = rdd.aggregate((0,0,0))(
          (acc,number) => (acc._1+number,acc._1,acc._3+1),
          (x,y) => (x._1 + y._1,x._2 + y._2,x._3+y._3)
        )
        println(result)
      }
    }
    //(10,3,4)
    

    相关文章

      网友评论

        本文标题:Spark 常用算子及代码

        本文链接:https://www.haomeiwen.com/subject/gitxphtx.html