Spark RDD API

作者: 不愿透露姓名的李某某 | 来源:发表于2019-07-04 14:35 被阅读0次

    1.aggregate:     先进行局部聚合,在进行全局聚合

     val z = sc.parallelize(List(1,2,3,4,5,6), 2)

     z.mapPartitionsWithIndex(myfunc).collect

            res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

    z.aggregate(0)(math.max(_, _), _ + _)

    res40: Int = 9

    // This example returns 16 since the initial value is 5

    // reduce of partition 0 will be max(5, 1, 2, 3) = 5

    // reduce of partition 1 will be max(5, 4, 5, 6) = 6

    // final reduce across partitions will be 5 + 5 + 6 = 16

    // note the final reduce include the initial value

    z.aggregate(5)(math.max(_, _), _ + _)

    res29: Int = 16

    val z = sc.parallelize(List("a","b","c","d","e","f"),2)

    z.mapPartitionsWithIndex(myfunc).collect

    res31: Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])

    z.aggregate("")(_ + _, _+_)

    res115: String = abcdef    

    z.aggregate("x")(_ + _, _+_)

    res116: String = xxdefxabc

    val z = sc.parallelize(List("12","23","345","4567"),2)

    z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

    res141: String = 42

    z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

    res142: String = 11

    val z = sc.parallelize(List("12","23","345",""),2)

    z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

    res143: String = 10

    2.aggregateByKey:key相同的值在进行聚合操作

    val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

    // lets have a look at what is in the partitions

    def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {

      iter.map(x => "[partID:" +  index + ", val: " + x + "]")

    }

    pairRDD.mapPartitionsWithIndex(myfunc).collect

    res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])

    pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect

    res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

    pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

    res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

    3.cartesian:将每一集合的值和另一个集合的值一一对应返回一个元祖

    val x = sc.parallelize(List(1,2,3,4,5))

    val y = sc.parallelize(List(6,7,8,9,10))

    x.cartesian(y).collect

    res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))

    4..checkpoint:读取文件,返回指定的行数

    sc.setCheckpointDir("my_directory_name")

    val a = sc.parallelize(1 to 4)

    a.checkpoint

    a.count

    14/02/25 18:13:53 INFO SparkContext: Starting job: count at :15

    ...

    14/02/25 18:13:53 INFO MemoryStore: Block broadcast_5 stored as values to memory (estimated size 115.7 KB, free 296.3 MB)

    14/02/25 18:13:53 INFO RDDCheckpointData: Done checkpointing RDD 11 to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/my_directory_name/65407913-fdc6-4ec1-82c9-48a1656b95d6/rdd-11, new parent is RDD 12

    res23: Long = 4

    5.coalesce, repartition:重新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false  少分区变多分区 true   多分区变少分区 false

    val y = sc.parallelize(1 to 10, 10)

    val z = y.coalesce(2, false)

    val s=y.repartition(2, false)

    z.partitions.length

    res9: Int = 2

    s.partitions.length

    res10: Int = 2

    6.cogroup [Pair], groupWith [Pair]:返回key值的value集合

    val a = sc.parallelize(List(1, 2, 1, 3), 1)

    val b = a.map((_, "b"))

    val c = a.map((_, "c"))

    b.cogroup(c).collect

    res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(

    (2,(ArrayBuffer(b),ArrayBuffer(c))),

    (3,(ArrayBuffer(b),ArrayBuffer(c))),

    (1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))

    )

    val d = a.map((_, "d"))

    b.cogroup(c, d).collect

    res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(

    (2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),

    (3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),

    (1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))

    )

    val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)

    val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)

    x.cogroup(y).collect

    res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(

    (4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), 

    (2,(ArrayBuffer(banana),ArrayBuffer())), 

    (3,(ArrayBuffer(orange),ArrayBuffer())),

    (1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),

    (5,(ArrayBuffer(),ArrayBuffer(computer))))

    6.collect, toArray:在驱动程序中,以数组的形式返回数据集的所有元素

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

    c.collect

    res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

    7.collectAsMap [Pair] :返回指定的元素,并且去重

    val a = sc.parallelize(List(1, 2, 1, 3), 1)

    val b = a.zip(a)

    b.collectAsMap

    res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

    8.combineByKey[Pair] :合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)

    val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

    val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

    val c = b.zip(a)

    val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)

    d.collect

    res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))

    9.context, sparkContext:一般用来创建RDD

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

    c.context

    res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@58c1c2f1

    10.count:返回RDD的元素个数

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

    c.count

    res2: Long = 4

    11.countApproxDistinct(x):计算Rdd中元素的大约个数,并且去重,x代表精度,x越小代表计算的越仔细。 

    val a = sc.parallelize(1 to 10000, 20)

    val b = a++a++a++a++a

    b.countApproxDistinct(0.1)

    res14: Long = 8224

    b.countApproxDistinct(0.05)

    res15: Long = 9750

    b.countApproxDistinct(0.01)

    res16: Long = 9947

    b.countApproxDistinct(0.001)

    res0: Long = 10000

    12.countApproxDistinctByKey(x):该函数根据精确度double,来计算相同key的大约个数。

      val a = sc.parallelize(List("wang","li","cao","zou"),2);

        val b = sc.parallelize(a.takeSample(true,1000,0))  //随机抽取1000个样本

        val c = sc.parallelize(1 to b.count.toInt)

        val d = b.zip(c)

        测试1:

        d.countApproxDistinctByKey(0.1).collect //计算相同可以得大约个数

     输出结果为:

         Array[(String, Long)] = Array((cao,286), (li,253), (zou,280), (wang,193))

     测试2:

         d.countApproxDistinctByKey(0.2).collect //计算相同可以得大约个数

     输出结果为:

         Array[(String, Long)] = Array((cao,291), (li,308), (zou,214), (wang,220))

    13.countByKey:计算相同key值的数目

    val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)

    c.countByKey

    res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)

    14.countByValue:统计数组中相同元素的值

    val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))

    b.countByValue

    res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1)

    15.dependencies

           在RDD中将依赖划分成两种类型:窄依赖(Narrow Dependencies)和宽依赖(Wide Dependencies)。

                窄依赖:是指每一个父RDD的分区都至多被一个子RDD的分区使用(每一个父RDD最多只能被一个子RDD使用)。

               宽依赖:是多个子RDD的分区依赖一个父RDD的分区(一个父RDD被多个子RDD使用)。

    val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))

    b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:12

    b.dependencies.length

    Int = 0

    b.map(a => a).dependencies.length

    res40: Int = 1

    b.cartesian(a).dependencies.length

    res41: Int = 2

    b.cartesian(a).dependencies

    res42: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.rdd.CartesianRDD$$anon$1@576ddaaa, org.apache.spark.rdd.CartesianRDD$$anon$2@6d2efbbd)

    16.distinct:对源RDD进行去重后返回一个新的RDD

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

    c.distinct.collect

    res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

    val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))

    a.distinct(2).partitions.length

    res16: Int = 2

    a.distinct(3).partitions.length

    res17: Int = 3

    17.first:返回集合的第一个元素

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

    c.first

    res1: String = Gnu

    18.返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

    val a = sc.parallelize(1 to 10, 3)

    val b = a.filter(_ % 2 == 0)

     b.collect

    res3: Array[Int] = Array(2, 4, 6, 8, 10)

    19.filterByRange [Ordered]:针对元祖取返回值为true的元素

    val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)

    val sortedRDD = randRDD.sortByKey()

    sortedRDD.filterByRange(1, 3).collect

    res66: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))

    20.filterWith

    val a = sc.parallelize(1 to 9, 3)

    val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0)

    b.collect

    res37: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9)

    val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5)

    a.filterWith(x=> x)((a, b) =>  b == 0).collect

    res30: Array[Int] = Array(1, 2)

    a.filterWith(x=> x)((a, b) =>  a % (b+1) == 0).collect

    res33: Array[Int] = Array(1, 2, 4, 6, 8, 10)

    a.filterWith(x=> x.toString)((a, b) =>  b == "2").collect

    res34: Array[Int] = Array(5, 6)

    20. flatmap:对集合中每个元素进行操作然后再扁平化。 

    val a = sc.parallelize(1 to 10, 5)

    a.flatMap(1 to _).collect

    res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect

    res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

    // The program below generates a random number of copies (up to 10) of the items in the list.

    val x  = sc.parallelize(1 to 10, 3)

    x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

    res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)

    21.flatMapValues:对元祖中value值进行操作然后再扁平化

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

    val b = a.map(x => (x.length, x))

    b.flatMapValues("x" + _ + "x").collect

    res6: Array[(Int, Char)] = Array((3,x), (3,d), (3,o), (3,g), (3,x), (5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x), (4,x), (4,l), (4,i), (4,o), (4,n), (4,x), (3,x), (3,c), (3,a), (3,t), (3,x), (7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x), (5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x))

    22.fold(*):算子其实就是先对rdd分区的每一个分区进行使用op函数,在调用op函数过程中将zeroValue参与计算,最后在对每一个分区的结果调用op函数(和aggregate有些相似)

    val a = sc.parallelize(List(1,2,3), 3)

    a.fold(0)(_ + _)

    res59: Int = 6

    23:foldByKey [Pair]:根据相同key值对value值进行计算

    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)

    val b = a.map(x => (x.length, x))

    b.foldByKey("")(_ + _).collect

    res84: Array[(Int, String)] = Array((3,dogcatowlgnuant)

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

    val b = a.map(x => (x.length, x))

    b.foldByKey("")(_ + _).collect

    res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

    24.foreach:在数据集的每一个元素上,运行函数func进行更新。

    val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)

    c.foreach(x => println(x + "s are yummy"))

    lions are yummy

    gnus are yummy

    crocodiles are yummy

    ants are yummy

    whales are yummy

    dolphins are yummy

    spiders are yummy

    25.foreachPartition:按照分区进行更新

    val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)

    b.foreachPartition(x => println(x.reduce(_ + _)))

    6

    15

    24

    26.glom:该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。

    val a = sc.parallelize(1 to 100, 3)

    a.glom.collect

    res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))

    27.groupBy:分区

    val a = sc.parallelize(1 to 9, 3)

    a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect

    res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))

    val a = sc.parallelize(1 to 9, 3)

    def myfunc(a: Int) : Int =

    {

    a % 2

    }

    a.groupBy(myfunc).collect

    //根据取余的结果进行分区

    res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))

    val a = sc.parallelize(1 to 9, 3)

    def myfunc(a: Int) : Int =

    {

    a % 2

    }

    a.groupBy(x => myfunc(x), 3).collect

    a.groupBy(myfunc(_), 1).collect

    res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))

    import org.apache.spark.Partitioner

    class MyPartitioner extends Partitioner {

    def numPartitions: Int = 2

    def getPartition(key: Any): Int =

    {

    key match

    {

    case null     => 0

    case key: Int => key          % numPartitions

    case _        => key.hashCode % numPartitions

    }

    }

    override def equals(other: Any): Boolean =

    {

    other match

    {

    case h: MyPartitioner => true

    case _                => false

    }

    }

    }

    val a = sc.parallelize(1 to 9, 3)

    val p = new MyPartitioner()

    val b = a.groupBy((x:Int) => { x }, p)

    val c = b.mapWith(i => i)((a, b) => (b, a))

    c.collect

    res42: Array[(Int, (Int, Seq[Int]))] = Array((0,(4,ArrayBuffer(4))), (0,(2,ArrayBuffer(2))), (0,(6,ArrayBuffer(6))), (0,(8,ArrayBuffer(8))), (1,(9,ArrayBuffer(9))), (1,(3,ArrayBuffer(3))), (1,(1,ArrayBuffer(1))), (1,(7,ArrayBuffer(7))), (1,(5,ArrayBuffer(5))))

    28.groupByKey :在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)

    val b = a.keyBy(_.length)

    b.groupByKey.collect

    res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

    29.id:

    val y = sc.parallelize(1 to 10, 10)

    y.id

    res16: Int = 19

    30.join:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD  相当于内连接(求交集)

    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

    val b = a.keyBy(_.length)

    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

    val d = c.keyBy(_.length)

    b.join(d).collect

    res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

    31.keyBy:对一个数组定义key值,返回一个(k,v)的RDD

    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

    val b = a.keyBy(_.length)

    b.collect

    res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

    32.keys:返回集合中key的值

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

    val b = a.map(x => (x.length, x))

    b.keys.collect

    res2: Array[Int] = Array(3, 5, 4, 3, 7, 5)

    33.leftOuterJoin:

        def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

        def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

        def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

    val b = a.keyBy(_.length)

    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

    val d = c.keyBy(_.length)

    b.leftOuterJoin(d).collect

    res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))

    34.lookup:对一个kv集合返回指定key长度的元素

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

    val b = a.map(x => (x.length, x))

    b.lookup(5)

    res0: Seq[String] = WrappedArray(tiger, eagle)

    35.map:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

    val b = a.map(_.length)

    val c = a.zip(b)

    c.collect

    res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

    36.mapPartitions:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

    (1)

    val a = sc.parallelize(1 to 9, 3)

    def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {

      var res = List[(T, T)]()

      var pre = iter.next

      while (iter.hasNext)

      {

        val cur = iter.next;

        res .::= (pre, cur)

        pre = cur;

      }

      res.iterator

    }

    a.mapPartitions(myfunc).collect

    res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

    (2)

    val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)

    def myfunc(iter: Iterator[Int]) : Iterator[Int] = {

    var res = List[Int]()

    while (iter.hasNext) {

    val cur = iter.next;

    res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)

    }

    res.iterator

    }

    x.mapPartitions(myfunc).collect

    // some of the number are not outputted at all. This is because the random number generated for it is zero.

    res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)

    37.mapPartitionsWithContext :针对文件,类似于mapPartitions

    val a = sc.parallelize(1 to 9, 3)

    import org.apache.spark.TaskContext

    def myfunc(tc: TaskContext, iter: Iterator[Int]) : Iterator[Int] = {

      tc.addOnCompleteCallback(() => println(

        "Partition: "     + tc.partitionId +

        ", AttemptID: "   + tc.attemptId ))

      iter.toList.filter(_ % 2 == 0).iterator

    }

    a.mapPartitionsWithContext(myfunc).collect

    14/04/01 23:05:48 INFO SparkContext: Starting job: collect at <console>:20

    ...

    14/04/01 23:05:48 INFO Executor: Running task ID 0

    Partition: 0, AttemptID: 0, Interrupted: false

    ...

    14/04/01 23:05:48 INFO Executor: Running task ID 1

    14/04/01 23:05:48 INFO TaskSetManager: Finished TID 0 in 470 ms on localhost (progress: 0/3)

    ...

    14/04/01 23:05:48 INFO Executor: Running task ID 2

    14/04/01 23:05:48 INFO TaskSetManager: Finished TID 1 in 23 ms on localhost (progress: 1/3)

    14/04/01 23:05:48 INFO DAGScheduler: Completed ResultTask(0, 1)

    ?

    res0: Array[Int] = Array(2, 6, 4, 8)

    38.mapPartitionsWithIndex:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

    val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

    def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {

    iter.map(x => index + "," + x)

    }

    x.mapPartitionsWithIndex(myfunc).collect()

    res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)

    39.mapValues:对kv集合中的v进行一些操作

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

    val b = a.map(x => (x.length, x))

    b.mapValues("x" + _ + "x").collect

    res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

    40.max:求最大值

    val y = sc.parallelize(10 to 30)

    y.max

    res75: Int = 30

    val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")))

    a.max

    res6: (Int, String) = (18,cat)

    41.min:求最小值

    val y = sc.parallelize(10 to 30)

    y.min

    res75: Int = 10

    val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (8, "cat")))

    a.min

    res4: (Int, String) = (3,tiger)

    42.name, setName

    val y = sc.parallelize(1 to 10, 10)

    y.name

    res13: String = null

    y.setName("Fancy RDD Name")

    y.name

    res15: String = Fancy RDD Name

    43.partitions :分区

    val b = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

    b.partitions

    res48: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@18aa, org.apache.spark.rdd.ParallelCollectionPartition@18ab)

    44.persist, cache :RDD缓存,可以避免重复计算从而减少时间,区别:cache内部调用了persist算子,cache默认就一个缓存级别MEMORY-ONLY ,而persist则可以选择缓存级别

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

    c.getStorageLevel

    res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1)

    c.cache

    c.getStorageLevel

    res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)

    45.reduce :通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

    val a = sc.parallelize(1 to 100, 3)

    a.reduce(_ + _)

    res41: Int = 5050

    46.reduceByKey :在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)

    val b = a.map(x => (x.length, x))

    b.reduceByKey(_ + _).collect

    res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

    val b = a.map(x => (x.length, x))

    b.reduceByKey(_ + _).collect

    res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

    47.repartition:指定分区数量

    val rdd = sc.parallelize(List(1, 2, 10, 4, 5, 2, 1, 1, 1), 3)

    rdd.partitions.length

    res2: Int = 3

    val rdd2  = rdd.repartition(5)

    rdd2.partitions.length

    res6: Int = 5

    48.repartitionAndSortWithinPartitions

        def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

    // first we will do range partitioning which is not sorted

    val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)

    val rPartitioner = new org.apache.spark.RangePartitioner(3, randRDD)

    val partitioned = randRDD.partitionBy(rPartitioner)

    def myfunc(index: Int, iter: Iterator[(Int, String)]) : Iterator[String] = {

    iter.map(x => "[partID:" +  index + ", val: " + x + "]")

    }

    partitioned.mapPartitionsWithIndex(myfunc).collect

    res0: Array[String] = Array([partID:0, val: (2,cat)], [partID:0, val: (3,book)], [partID:0, val: (1,screen)], [partID:1, val: (4,tv)], [partID:1, val: (5,heater)], [partID:2, val: (6,mouse)], [partID:2, val: (7,cup)])

    // now lets repartition but this time have it sorted

    val partitioned = randRDD.repartitionAndSortWithinPartitions(rPartitioner)

    def myfunc(index: Int, iter: Iterator[(Int, String)]) : Iterator[String] = {

    iter.map(x => "[partID:" +  index + ", val: " + x + "]")

    }

    partitioned.mapPartitionsWithIndex(myfunc).collect

    res1: Array[String] = Array([partID:0, val: (1,screen)], [partID:0, val: (2,cat)], [partID:0, val: (3,book)], [partID:1, val: (4,tv)], [partID:1, val: (5,heater)], [partID:2, val: (6,mouse)], [partID:2, val: (7,cup)])

    49.rightOuterJoin:相当于数据库中的右连接

    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

    val b = a.keyBy(_.length)

    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

    val d = c.keyBy(_.length)

    b.rightOuterJoin(d).collect

    res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))

    50.saveAsObjectFile:将结果保存在指定路径

    val x = sc.parallelize(1 to 100, 3)

    x.saveAsObjectFile("objFile")

    val y = sc.objectFile[Int]("objFile")

    y.collect

    res52: Array[Int] =  Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

    51.saveAsSequenceFile:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

    val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)

    v.saveAsSequenceFile("hd_seq_file")

    14/04/19 05:45:43 INFO FileOutputCommitter: Saved output of task 'attempt_201404190545_0000_m_000001_191' to file:/home/cloudera/hd_seq_file

    [cloudera@localhost ~]$ ll ~/hd_seq_file

    total 8

    -rwxr-xr-x 1 cloudera cloudera 117 Apr 19 05:45 part-00000

    -rwxr-xr-x 1 cloudera cloudera 133 Apr 19 05:45 part-00001

    -rwxr-xr-x 1 cloudera cloudera   0 Apr 19 05:45 _SUCCESS

    52.saveAsTextFile:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

    val a = sc.parallelize(1 to 10000, 3)

    a.saveAsTextFile("mydata_a")

    14/04/03 21:11:36 INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a

    [cloudera@localhost ~]$ head -n 5 ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/part-00000

    1

    2

    3

    4

    5

    // Produces 3 output files since we have created the a RDD with 3 partitions

    [cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/

    -rwxr-xr-x 1 cloudera cloudera 15558 Apr  3 21:11 part-00000

    -rwxr-xr-x 1 cloudera cloudera 16665 Apr  3 21:11 part-00001

    -rwxr-xr-x 1 cloudera cloudera 16671 Apr  3 21:11 part-00002

    53.sortBy:排序

    val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))

    y.sortBy(c => c, true).collect

    res101: Array[Int] = Array(1, 1, 2, 3, 5, 7)

    y.sortBy(c => c, false).collect

    res102: Array[Int] = Array(7, 5, 3, 2, 1, 1)

    val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))

    z.sortBy(c => c._1, true).collect

    res109: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))

    z.sortBy(c => c._2, true).collect

    res108: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))

    54.sortByKey:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)

    val b = sc.parallelize(1 to a.count.toInt, 2)

    val c = a.zip(b)

    c.sortByKey(true).collect

    res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))

    c.sortByKey(false).collect

    res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

    val a = sc.parallelize(1 to 100, 5)

    val b = a.cartesian(a)

    val c = sc.parallelize(b.takeSample(true, 5, 13), 2)

    val d = c.sortByKey(false)

    res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))

    55.subtract:返回前rdd元素不在后rdd的rdd

    val a = sc.parallelize(1 to 9, 3)

    val b = sc.parallelize(1 to 3, 3)

    val c = a.subtract(b)

    c.collect

    res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)

    56.subtractByKey 返回前rddkey值元素不在后rdd的rdd

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)

    val b = a.keyBy(_.length)

    val c = sc.parallelize(List("ant", "falcon", "squid"), 2)

    val d = c.keyBy(_.length)

    b.subtractByKey(d).collect

    res15: Array[(Int, String)] = Array((4,lion))

    57.sum:求和

    val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)

    x.sum

    res17: Double = 101.39999999999999

    58.take:求前几名的值

    val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)

    b.take(2)

    res18: Array[String] = Array(dog, cat)

    val b = sc.parallelize(1 to 10000, 5000)

    b.take(100)

    res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

    59.takeOrdered:求前几名的值加排序

    val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)

    b.takeOrdered(2)

    res19: Array[String] = Array(ape, cat)

    60.top:排序求最大值的几个元素

    val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)

    c.top(2)

    res28: Array[Int] = Array(9, 8)

    61.union:对源RDD和参数RDD求并集后返回一个新的RDD

    val a = sc.parallelize(1 to 3, 1)

    val b = sc.parallelize(5 to 7, 1)

    (a ++ b).collect

    res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

    62.values:对kv的v进行操作

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

    val b = a.map(x => (x.length, x))

    b.values.collect

    res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

    63.zip:将一个RDD的值与另一个RDD的值进行合并后返回一个kv类型的RDD

    val a = sc.parallelize(1 to 100, 3)

    val b = sc.parallelize(101 to 200, 3)

    a.zip(b).collect

    res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), (38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), (46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), (54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), (62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), (70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), (78,...

    val a = sc.parallelize(1 to 100, 3)

    val b = sc.parallelize(101 to 200, 3)

    val c = sc.parallelize(201 to 300, 3)

    a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect

    res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202), (3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207), (8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212), (13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217), (18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222), (23,123,223), (24,124,224), (25,125,225), (26,126,226), (27,127,227), (28,128,228), (29,129,229), (30,130,230), (31,131,231), (32,132,232), (33,133,233), (34,134,234), (35,135,235), (36,136,236), (37,137,237), (38,138,238), (39,139,239), (40,140,240), (41,141,241), (42,142,242), (43,143,243), (44,144,244), (45,145,245), (46,146,246), (47,147,247), (48,148,248), (49,149,249), (50,150,250), (51,151,251), (52,152,252), (53,153,253), (54,154,254), (55,155,255)...

    64.zipPartitions

    val a = sc.parallelize(0 to 9, 3)

    val b = sc.parallelize(10 to 19, 3)

    val c = sc.parallelize(100 to 109, 3)

    def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] =

    {

    var res = List[String]()

    while (aiter.hasNext && biter.hasNext && citer.hasNext)

    {

    val x = aiter.next + " " + biter.next + " " + citer.next

    res ::= x

    }

    res.iterator

    }

    a.zipPartitions(b, c)(myfunc).collect

    res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)

    65.zipWithIndex

    val z = sc.parallelize(Array("A", "B", "C", "D"))

    val r = z.zipWithIndex

    res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3))

    val z = sc.parallelize(100 to 120, 5)

    val r = z.zipWithIndex

    r.collect

    res11: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3), (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11), (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18), (119,19), (120,20))

    相关文章

      网友评论

        本文标题:Spark RDD API

        本文链接:https://www.haomeiwen.com/subject/kmwkhctx.html