Spark RDD Api使用指南

作者: digger30 | 来源:发表于2019-06-08 12:09 被阅读2次
    img

    ​ 在Spark快速入门-RDD文章中学了spark的RDD。spark包含转换行动操作。在进行spark程序开发的过程中,不可避免的要与spark rdd的转换和行动操作打交道。本文会介绍spark rdd常用的转换和行动操作。

    转换操作

    对一个RDD进行转换操作

    函数名 描述
    filter() 返回一个由通过传给filter()的函数的元素组成的RDD
    flatMap() 将函数应用于RDD的每个元素,将返回的迭代器的所有内容构成新的RDD
    map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD
    distinct() 去重
    sample(withReplacement,<br />fraction, [seed]) 对RDD进行采样,以及是否替换
    val words = sc.parallelize(List("hello world", "spark", "hadoop", "spark sql", "spark streaming"))
    
    // flatMap
    val rdd1 = words.flatMap(x => x.split(" "))
    
    // filter
    val rdd2 = rdd1.filter(_.contains("spark"))
    
    // sample
    val rdd3 = rdd1.sample()
    
    // distinct
    val rdd4 = rdd1.distinct()
    
    // map
    val rdd5 = rdd2.map((_, 1))
    

    结果:

    scala> a.collect
    res12: Array[Int] = Array(1, 2, 3)
    
    scala> b.collect
    res13: Array[Int] = Array(3, 4, 5)
    
    scala> c.collect
    res14: Array[Int] = Array(1, 2, 3, 3, 4, 5)
    
    scala> d.collect
    res15: Array[Int] = Array(3)
    
    scala> e.collect
    res16: Array[Int] = Array(1, 2)
    
    scala> f.collect
    res17: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
    

    对两个RDD进行转换操作

    函数名 描述
    union() 生成一个包含两个RDD中所有元素的RDD
    intersection() 求两个RDD共同元素的RDD
    substract() 移除一个RDD中的内容
    cartesian() 与另一个RDD进行笛卡尔积

    实战:

    val a = sc.parallelize(List(1, 2, 3))
    val b = sc.parallelize(List(3, 4, 5))
    
    // union:生成一个包含两个RDD中所有元素的RDD
    val c = a.union(b)
    
    // intersection:求两个RDD共同的元素的RDD
    val d = a.intersection(b)
    
    // subtract:移除一个RDD中的内容
    val e = a.subtract(b)
    
    // cartesian:与另一个RDD的笛卡尔积
    val f = a.cartesian(b)
    

    结果:

    scala> a.collect
    res12: Array[Int] = Array(1, 2, 3)
    
    scala> b.collect
    res13: Array[Int] = Array(3, 4, 5)
    
    scala> c.collect
    res14: Array[Int] = Array(1, 2, 3, 3, 4, 5)
    
    scala> d.collect
    res15: Array[Int] = Array(3)
    
    scala> e.collect
    res16: Array[Int] = Array(1, 2)
    
    scala> f.collect
    res17: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
    

    行动操作

    对一个RDD进行行动操作

    函数名 描述
    collect() 返回RDD中的所有元素
    count() 返回RDD中的元素个数
    countByValue() 各元素在RDD中出现的次数
    take(num) 从RDD中返回num个元素
    top(num) 从RDD中返回最前面的num个元素
    takeOrdered(num)(ordering) 从RDD中按照顺序返回最前面的num个元素
    takeSample(withReplacement,<br /> num, [seed]) 从RDD中返回任意一些元素
    reduce(func) 并行整合RDD中所有数据
    flod(zero)(func) 和reduce一样,但是需要提供初始值
    aggregate(zeroValue)(seqOp, comOp) 和reduce一样,但是通常返回不同类型的函数
    foreach(func) 对RDD中的每个元素使用给定的函数

    实战:

    val rdd = sc.parallelize(List(1, 2, 3, 3))
    
    // collect:返回RDD中的所有元素
    rdd.collect()
    
    // count:返回RDD中的元素个数
    rdd.count()
    
    // countByValue:返回个元素在RDD中出现的次数
    rdd.countByValue()
    
    // take:从RDD中返回2个元素
    rdd.take(2)
    
    // top:从RDD中返回最前面的2个元素
    val x = rdd.top(2)
    
    // takeOrdered:从RDD中按照提供的顺序返回最前面的2个元素
    rdd.takeOrdered(2)
    
    object Ord extends Ordering[Int] {
        override def compare(x: Int, y: Int): Int = {
            if (x < y) 1 else -1;
        }
    }
    val pa = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
    pa.takeOrdered(3)(Ord)
    
    // foreach(func):对RDD中的每个元素使用给定的函数
    rdd.foreach(println)
    

    结果:

    scala> val rdd = sc.parallelize(List(1, 2, 3, 3))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24
    
    scala> rdd.collect()
    res18: Array[Int] = Array(1, 2, 3, 3)
    
    scala> rdd.count()
    res19: Long = 4
    
    scala> rdd.countByValue()
    res20: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
    
    scala> rdd.take(2)
    res21: Array[Int] = Array(1, 2)
    
    scala> val x = rdd.top(2)
    x: Array[Int] = Array(3, 3)
    
    scala> rdd.takeOrdered(2)
    res22: Array[Int] = Array(1, 2)
    
    scala> object Ord extends Ordering[Int] {
         |     override def compare(x: Int, y: Int): Int = {
         |         if (x < y) 1 else -1;
         |     }
         | }
    defined object Ord
    
    scala> val pa = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
    pa: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:24
    
    scala> pa.takeOrdered(3)(Ord)
    res23: Array[Int] = Array(6, 5, 4)
    
    scala> rdd.foreach(println)
    2
    1
    3
    3
    

    对一个Pair RDD进行转化操作

    函数名 描述
    reduceByKey(func) 合并具有相同键的值
    groupByKey() 对具有相同键的值进行分组
    combineByKey(createCombiner, <br />mergeValue, mergeCombiners, <br />numPartitions) 使用不同的返回类型合并具有相同键的值
    mapValues(func) 对Pair RDD中的每个值应用一个函数而不改变建
    flatMapValues(func) 对Pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生辰给一个对应原键的键值对记录
    keys() 返回一个仅包含键的RDD
    values() 返回一个仅包含值的RDD
    sortByKey() 返回一个根据键排序的RDD

    实战:

    val pairRDD = sc.parallelize(List((1, 2), (3, 4), (3, 6)))
    val pairRDD1 = sc.parallelize(List((3, 5)))
    
    // reduceByKey: 合并具有相同键的RDD
    val rdd6 = pairRDD.reduceByKey((x, y) => x + y)
    
    // groupByKey: 对具有相同键的值进行分组
    val rdd7 = pairRDD.groupByKey()
    
    //    pairRDD.combineByKey()
    val rdd8 = pairRDD.combineByKey(
        (v) => (v, 1),
        (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
        (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    )
    
    // keys: 返回一个仅包含键的RDD
    val rdd9 = pairRDD.keys
    
    // values: 返回一个仅包含值的RDD
    var rdd10 = pairRDD.values
    
    // sortByKey: 返回一个根据键排序的RDD
    val rdd11 = pairRDD.sortByKey()
    
    // subtract: 删除RDD中键与pairRDD1中键相同的元素
    var rdd12 = pairRDD.subtract(pairRDD1)
    

    结果:

    scala> rdd6.collect
    res25: Array[(Int, Int)] = Array((1,2), (3,10))
    
    scala> rdd7.collect
    res26: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2)), (3,CompactBuffer(4, 6)))
    
    scala> rdd8.collect
    res27: Array[(Int, (Int, Int))] = Array((1,(2,1)), (3,(10,2)))
    
    scala> rdd9.collect
    res28: Array[Int] = Array(1, 3, 3)
    
    scala> rdd10.collect
    res29: Array[Int] = Array(2, 4, 6)
    
    scala> rdd11.collect
    res30: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))
    
    scala> rdd12.collect
    res31: Array[(Int, Int)] = Array((3,6), (3,4), (1,2))
    

    对两个Pair RDD进行转换操作

    函数 描述
    subtractByKey 删除RDD中键与other中的键相同的元素
    join 对两个rdd进行内连接
    rightOuterJoin 对两个RDD进行连接操作,确保第一个RDD的键必须存在
    leftOuterJoin 对两个RDD进行连接操作,确保第二个RDD的键必须存在
    cogroup 将两个RDD中拥有相同键的数据分组到一起
    // join: 对两个RDD进行内连接
    var rdd13 = pairRDD.join(pairRDD1)
    
    // rightOuterJoin: 对两个RDD进行连接操作,确保第一个RDD的键必须存在
    var rdd14 = pairRDD.rightOuterJoin(pairRDD1)
    
    // leftOuterJoin: 对两个RDD进行连接操作,确保第二个RDD的键必须存在
    var rdd15 = pairRDD.leftOuterJoin(pairRDD1)
    
    // cogroup: 将两个RDD中拥有相同键的数据分组到一起
    var rdd16 = pairRDD.cogroup(pairRDD1)
    

    结果:

    scala> rdd13.collect
    res32: Array[(Int, (Int, Int))] = Array((3,(4,5)), (3,(6,5)))
    
    scala> rdd14.collect
    res33: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),5)), (3,(Some(6),5)))
    
    scala> rdd15.collect
    res34: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(5))), (3,(6,Some(5))))
    
    scala> rdd16.collect
    res35: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(4, 6),CompactBuffer(5))))
    

    Pair RDD行动操作

    函数名 描述
    countByKey() 对每个键对应的元素分别计数
    collectAsMap() 将结果以映射表的形式返回,以便查询
    lookup(key) 返回给定键对应的所有值

    实战:

    // countByValue: 对每个键对应的元素分别计数
    pairRDD.countByValue()
    
    // collectAsMap: 将结果以映射表的形式返回,以便查询
    pairRDD.collectAsMap()
    
    // lookup: 返回指定键对应的所有值
    pairRDD.lookup(3)
    

    结果:

    scala> pairRDD.countByValue()
    res36: scala.collection.Map[(Int, Int),Long] = Map((3,6) -> 1, (3,4) -> 1, (1,2) -> 1)
    
    scala> pairRDD.collectAsMap()
    res37: scala.collection.Map[Int,Int] = Map(1 -> 2, 3 -> 6)
    
    scala> pairRDD.lookup(3)
    res38: Seq[Int] = WrappedArray(4, 6)
    

    点击查看源码

    image

    阅读过本文的同学还看了以下:

    相关文章

      网友评论

        本文标题:Spark RDD Api使用指南

        本文链接:https://www.haomeiwen.com/subject/iqhixctx.html