美文网首页
Spark 常用RDD操作

Spark 常用RDD操作

作者: 万州客 | 来源:发表于2022-05-19 08:49 被阅读0次

    可以对比一下flink学习

    代码

    
    scala> val rdd = sc.makeRDD(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:23
    
    scala> rdd.first
    res3: Int = 1
    
    scala> rdd.count
    res4: Long = 10
    
    scala> rdd.reduce(_+_)
    res5: Int = 55
    
    scala> rdd.reduce(_*_)
    res6: Int = 3628800
    
    scala> rdd.collect
    res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> rdd.take(3)
    res8: Array[Int] = Array(1, 2, 3)
    
    scala> rdd.top(3)
    res9: Array[Int] = Array(10, 9, 8)
    
    转换操作
    scala> val rdd = sc.parallelize(1 to 9, 3)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:23
    
    scala> val rdd1 = rdd.map(x => x * 2)
    rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:23
    
    scala> rdd.collect
    res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    scala> rdd1.collect
    res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
    
    scala> val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"))
    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:23
    
    scala> val rdd2 = rdd1.map(x => (x.length, x))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[9] at map at <console>:23
    
    scala> rdd2.collect
    res12: Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))
    
    scala> val rdd3 = rdd2.mapValue("x" + _ + "x")
    <console>:23: error: value mapValue is not a member of org.apache.spark.rdd.RDD[(Int, String)]
           val rdd3 = rdd2.mapValue("x" + _ + "x")
                           ^
    
    scala> val rdd3 = rdd2.mapValues("x" + _ + "x")
    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at mapValues at <console>:23
    
    scala> rdd3.collect
    res13: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
    
    scala> val a = sc.parallelize(1 to 9, 3)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23
               ^
    
    scala> def myfunc[T](iter: Iterator[T]): Iterator[(T, T)] = {
         | var res = List[(T, T)]()
         | var pre = iter.next
         | while (iter.hasNext) {
         | val cur = iter.next
         | res .::= (pre, cur)
         | pre = cur
         | }
         | res.iterator
         | }
    myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
                           ^
    
    scala> a.mapPartitions(myfunc).collect
    res15: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
    
    scala> val rdd1 = sc.parallelize(1 to 4, 2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:23
    
    scala> rdd1.flatMap(x => 1 to x).collect
    res16: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)
    
    scala> val rdd1 = sc.parallelize(List((1, 2), (3, 4)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:23
    
    scala> val rdd2 = rdd1.flatMapValues(x => x.to(5))
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[16] at flatMapValues at <console>:23
    
    scala> rdd2.collect
    res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))
    
    
    scala> val rdd = sc.parallelize(List(("A", 2), ("B", 4), ("C", 6), ("A", 3), ("C", 7)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:23
    
    scala> rdd.groupByKey().collect
    res18: Array[(String, Iterable[Int])] = Array((A,CompactBuffer(2, 3)), (B,CompactBuffer(4)), (C,CompactBuffer(6, 7)))
    
    scala> rdd.sortByKey().collect
    res19: Array[(String, Int)] = Array((A,2), (A,3), (B,4), (C,6), (C,7))
    
    scala> rdd.sortByKey(false).collect
    res20: Array[(String, Int)] = Array((C,6), (C,7), (B,4), (A,2), (A,3))
    
    
    scala> val rdd = sc.parallelize(List(("A", 2), ("A", 1), ("B", 4), ("B", 6), ("C", 7)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:23
    
    scala> rdd.reduceByKey((x, y) => x + y).collect
    res21: Array[(String, Int)] = Array((A,3), (B,10), (C,7))
    
    scala> val rdd = sc.makeRDD(1 to 10).filter(_ % 3 == 0)
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at filter at <console>:23
    
    scala> rdd.collect
    res22: Array[Int] = Array(3, 6, 9)
    
    scala> val rdd1 = sc.makeRDD(1 to 3, 1)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at <console>:23
    
    scala> val rdd2 = sc.makeRDD(2 to 4, 1)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at makeRDD at <console>:23
    
    scala> val unionRDD = rdd1.union(rdd2)
    unionRDD: org.apache.spark.rdd.RDD[Int] = UnionRDD[31] at union at <console>:24
    
    scala> unionRDD.collect
    res23: Array[Int] = Array(1, 2, 3, 2, 3, 4)
    
    scala> val intersectionRDD = rdd1.intersection(rdd2)
    intersectionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at intersection at <console>:24
    
    scala> intersectionRDD.collect
    res24: Array[Int] = Array(3, 2)
    
    scala> val substractRDD = rdd1.subtract(rdd2)
    substractRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at subtract at <console>:24
    
    scala> substractRDD.collect
    res25: Array[Int] = Array(1)
    
    scala> val l = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 1)
    l: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:23
    
    scala> val r = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')), 1)
    r: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[43] at parallelize at <console>:23
    
    scala> val joinrdd1 = l.join(r).collect
    joinrdd1: Array[(Int, (Int, Char))] = Array((1,(1,x)), (1,(2,x)), (2,(1,y)), (2,(1,z)))
    
    scala> val joinrdd2 = l.leftOuterJoin(r).collect
    joinrdd2: Array[(Int, (Int, Option[Char]))] = Array((1,(1,Some(x))), (1,(2,Some(x))), (3,(1,None)), (2,(1,Some(y))), (2,(1,Some(z))))
    
    scala> val joinrdd3 = l.rightOuterJoin(r).collect
    joinrdd3: Array[(Int, (Option[Int], Char))] = Array((4,(None,w)), (1,(Some(1),x)), (1,(Some(2),x)), (2,(Some(1),y)), (2,(Some(1),z)))
    

    效果

    2022-05-16 15_50_16-704690 Spark大数据分析技术与实战.pdf - SumatraPDF.png

    相关文章

      网友评论

          本文标题:Spark 常用RDD操作

          本文链接:https://www.haomeiwen.com/subject/nrdwurtx.html