美文网首页
RDD基本操作

RDD基本操作

作者: wong小尧 | 来源:发表于2018-07-19 16:18 被阅读0次

    弹性分布式数据集(Resilient Distributed Dataset-RDD)
    我使用spark比较晚,所以我使用dataframe比较多,听说rdd这块以后spark也停止更新了,但是目前dataframe还是不如rdd灵活,而且spark SQL一些方法不大稳定,有一些rdd的技巧还是要继续使用。
    下面是整理的常用操作,一部分来自书里,一部分是自己整理的。

    1.创建RDD

    两种创建RDD的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。

    1.Scala 中的parallelize()方法,直接把程序中一个已有的集合传给这个方法就可以生成一个RDD。
    val lines = sc.parallelize(List("pandas","i like pandas"))
    val lines = sc.parallelize(Array(1,2,3,4,5,6))
    用case class来规范数据类型

    case class UserData(userID:String,userName:String,userAddress:String)
    val lines = sc.parallelize(
                                          List("1234","pandas","chengdu"),
                                          List("4321","tiger","jiling"),
                                          List("1111","bird","china")
    )
    

    2.更常用的方式是从外部存储中读取数据来创建RDD。
    val lines = sc.textFile("D://wenben.txt")
    3.使用makeRDD来构造(不建议使用)
    val rdd01 = sc.makeRDD(List((1,2,null,null),(1,2,3,4),(4,3,null,null)))

    2.RDD操作

    RDD支持两种操作:转化操作和行动操作
    转化操作返回的是RDD,而行动操作返回的是其他的数据类型。

    2.1.转化操作(lazy模式):

    最常用的转化操作是map()和filter(),inputRDD{1,2,3,4}
    inputRDD.map(x => x * x)
    Mapped RDD{1,4,9,16}
    把列类型转化为字符串,map中通常使用v(0),索引从0开始
    inputRDD.map(v => (v(0).toString,v(1).toString))

    inputRDD.filter(x => x! = 1)
    Filtered RDD{2,3,4}
    filter筛选条件为,第一列字符串长度大于10,第二列字符串长度大于5。这里索引从1开始。
    inputRDD.filter(v => v._1.length > 10 && v._2.length > 5)

    flatMap和map操作

    flatMap()相当于把map的作用的数组拆散再合并。

    import org.apache.spark.{SparkConf, SparkContext}
    
    object MapAndFlatMap {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("map_flatMap_demo").setMaster("local"))
        val arrayRDD =sc.parallelize(Array("a_b","c_d","e_f"))
        arrayRDD.foreach(println)
    /*
    结果 :
    a_b
    c_d
    e_f
    */
    
        arrayRDD.map(string=>{
          string.split("_")
        }).foreach(x=>{
          println(x.mkString(",")) 
        })
    /*
    结果:
    a,b
    c,d
    e,f
    所以map得到的RDD结果是 Array(Array("a","b"),Array("c","d"),Array("e","f"))
    */
        arrayRDD.flatMap(string=>{
          string.split("_")
        }).foreach(x=>{
          println(x.mkString(","))//打印结果3
        })
    /*
    结果:
    a
    b
    c
    d
    e
    f
    所以flatMap得到的RDD结果是Array("a","b","c","d","e","f")
    */
      }
    }
    

    1.对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作

    函数名 目的 示例 结果
    map() 函数应用于rdd中每个元素 rdd.map(x => x+1) {2,3,4,4}
    flatMap() 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词,执行扁平化操作。 rdd.flatMap(x => x.to(3)) {1,2,3,2,3,3,3}
    filter() 返回一个由通过传给filter()的函数的元素组成的RDD rdd.filter(x => x!=1) {2,3,3}
    distinct() 去重 rdd.distinct() {1,2,3}
    sample(withReplacement,fraction,[seed]) 对RDD采样,以及是否替换,抽取比例等等 rdd.sample(false,0.5) {1,2,3} 非确定的

    sample操作

    sample操作有两个参数,第一个参数是代表采样是有放回还是无放回,第二个参数代表抽样的比例。

    map和mapPartitions

    (1)、使用map(func())遍历
    map相当于遍历,遍历1000行的表,就要调用func一千次。

    (2)、使用mapPartitions(func())遍历
    mapPartition中func()仅仅被调用分区数量的次数,例如10个分区,仅仅调用10次。假如函数内部存在分词词库导入关闭,数据库链接等等,使用map每调用func()一次都要跑一遍这些操作,严重影响性能。
    这时候就需要把map改写成mapPartitions
    改写方式也很简单:
    例如可以用一下map,几乎不做任何操作:

        var data_rdd = df.rdd.map{x => {
          val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
          (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
          }}
    

    改写成mapPatitions需要使用一下迭代器:

       val data_rdd = df.rdd.repartition(200).mapPartitions(iter => for (x <- iter) yield {
          val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
          (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
        })
    

    2.对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作

    函数名 目的 示例 结果
    union() 生成一个包含两个RDD中所有元素的RDD (不去重的并集) rdd.union(other) {1,2,3,3,4,5}
    intersection() 求两个RDD共同的元素的RDD (交集) rdd.intersection(other) {3}
    subtract() 求移除一个RDD中的内容 (差集) rdd.subtract(other) {1,2}
    cartesian() 笛卡尔积 rdd.cartesian(other) {(1,3),(1,4),...,(3,5)}

    3.行动操作:

    函数名 目的 示例 结果
    collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,3}
    count() RDD中元素的个数 rdd.count() 4
    countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1,1),(2,1),(3,2)}
    take(num) 从RDD中返回num个元素 rdd.take(2) {1,2}
    top(num) 从RDD中返回最前面的num个元素 rdd.top(2) {3,3}
    takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
    takeSampe(withReplacement,num,[seed]) 从RDD中返回任意一些元素 rdd.takeSample(false,1) 非确定的
    reduce(func) 并行整合RDD中所有数据 rdd.reduce((x,y) => x+y) 9
    fold(zero)(func) 和reduce类似,但是需要提供初始值 rdd.fold(0)((x,y) => x+y) 9
    aggregate(zeroValue)(seqOp,comOp) 和reduce相似,但是通常返回不同类型的函数 rdd.aggregate((0,0)) ((x,y)=> (x._1+x,x._2+1) ,(x,y) => (x._1+y._1,x._2+y._2)) (9,4)
    foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func) 无 //和map进行对比,map也是对RDD中的每个元素进行操作,但是允许有返回值

    first操作

    返回RDD中第一个元素

    collect操作

    以集合形式返回RDD的元素(常用语小的数据集)
    比如某个文件数据

    中国
    美国
    加拿大
    ……
    

    读取的时候希望把文件中的元素放到列表里

    import scala.collection.mutable.ArrayBuffer
    val stat_array = new ArrayBuffer[String]()
    rdd.collect.foreach(v => (stat_array += v))
    

    take操作

    take(num:Int):将RDD作为集合,返回集合中[0,num-1]下标的元素

    reduce操作

    reduce(f:(T,T)=>T):对RDD中元素进行二元计算,返回计算结果

    top操作

    top(num:Int):按照默认的或者是指定的排序规则,返回前num个元素

    takeOrdered操作

    takeOrdered(num:Int):和top相反,返回前num个元素

    foreach和map的区别:

    foreach是action操作,它没有返回值,通常用于print或者写入文件等操作。
    map是transform操作(lazy),生成一个新的rdd

    sortBy函数(在org.apache.spark.rdd.RDD类中)

    共三个参数:
      第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
      第二个参数是ascending,该参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
      第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等

    例如:
    data: (item_num, item_id, item_name)
    val data_sorted = data.sortBy(_._1, ascending = false) 根据item_num从大到小排列(降序)。

    4.键值对操作

    4.1.partitionBy,mapValues,flatMapValues

    partitionBy,mapValues,flatMapValues和基本转换操作中的repatition,map和flatMap功能类似。
    partitionBy接口根据partitioner函数生成新的ShuffledRDD,将原RDD重新分区(在repartition中也是先将RDD[T]转化成RDD[K,V],这里V是null,然后使用RDD[K,V]作为函数生成ShuffledRDD)。mapValues和flatMapValues针对[K,V]中的V值进行map操作和flatMap操作。
    使用partitionBy,mapValues,flatMapValues不会破坏原数据的partition的结构&信息,使用repatition,map和flatMap后还需要做Shuffle,数据就不带有原先的partition信息。所以键值对操作尽量使用partitionBy,mapValues,flatMapValues,少用repatition,map和flatMap。



    4.2.combineByKey,foldByKey,reduceBykey,groupByKey

    四种键值对转换操作都是针对RDD[K,V]本身,不涉及与其他RDD的组合操作,四种操作类型最终都会归结为堆combinByKey的调用。combineByKey接口是将RDD[K,V]转化成返回类型RDD[K,C],这里V类型与C类型可以相同也可以不同。
    groupbykey效率很低,尽量使用reducebykey和combinebykey来代替groupbykey

    reduceByKey函数

    reduceByKey(_+_) 相当于 reduceByKey((a,b)=>(a+b))
    reduceByKey(_++_) 相当于 reduceByKey((a,b)=>(a++b))这里a和b 是一个list,++用于合并列表

    combineByKey函数

    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
    
    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
    
    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
    

    其中的参数:

    createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
    mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
    mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
    numPartitions:结果RDD分区数,默认保持原有的分区数
    partitioner:分区函数,默认为HashPartitioner
    mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

    举例理解:

    假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:

    1 定义我们需要什么样的果汁。
    2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁。--相当于hadoop中的local combiner
    3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。--相当于全局进行combiner

    那么对比上述三步,combineByKey的三个函数也就是这三个功能
    1 createCombiner就是定义了v如何转换为c
    2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C
    3 就是定义了如何将相同key下的C给合并成一个C

    var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
    
    rdd1.combineByKey(
    (v : Int) => List(v),             --将1 转换成 list(1)
    (c : List[Int], v : Int) => v :: c,       --将list(1)和2进行组合从而转换成list(1,2)
    (c1 : List[Int], c2 : List[Int]) => c1 ::: c2  --将全局相同的key的value进行组合
    ).collect
    res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
    

    简要介绍
    def combineByKey[C](createCombiner: (V) => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RD
    createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就
    和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
    那个键对应的累加器的初始值
    mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
    mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更
    多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各
    个分区的结果进行合并。

    创建一个学生成绩说明的类
    case class ScoreDetail(studentName: String, subject: String, score: Float)
    下面是一些测试数据,加载测试数据集合 key = Students name and value = ScoreDetail instance

        val scores = List(
          ScoreDetail("xiaoming", "Math", 98),
          ScoreDetail("xiaoming", "English", 88),
          ScoreDetail("wangwu", "Math", 75),
          ScoreDetail("wangwu", "English", 78),
          ScoreDetail("lihua", "Math", 90),
          ScoreDetail("lihua", "English", 80),
          ScoreDetail("zhangsan", "Math", 91),
          ScoreDetail("zhangsan", "English", 80))
    

    换成二元组, 也可以理解成转换成一个map, 利用了for 和 yield的组合

    val scoresWithKey = for { i <- scores } yield (i.studentName, i)
    val scoresWithKeyRDD = sc.parallelize(scoresWithKey).partitionBy(new org.apache.spark.HashPartitioner(3)).cache
    

    聚合求平均值让后打印

          val avgScoresRDD = scoresWithKeyRDD.combineByKey(
          (x: ScoreDetail) => (x.score, 1)                     /*createCombiner*/,
          (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) /*mergeValue*/,
          (acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) /*mergeCombiners*/
          // calculate the average
        ).map( { case(key, value) => (key, value._1/value._2) })
     
        avgScoresRDD.collect.foreach(println)
    /*输出:
    (zhangsan,85.5)
    (lihua,85.0)
    (xiaoming,93.0)
    (wangwu,76.5)
    */
    

    解释一下scoresWithKeyRDD.combineByKey
    createCombiner: (x: ScoreDetail) => (x.score, 1)
    这是第一次遇到zhangsan,创建一个函数,把map中的value转成另外一个类型 ,这里是把(zhangsan,(ScoreDetail类))转换成(zhangsan,(91,1))
    mergeValue: (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) 再次碰到张三, 就把这两个合并, 这里是将(zhangsan,(91,1)) 这种类型 和 (zhangsan,(ScoreDetail类))这种类型合并,合并成了(zhangsan,(171,2))
    mergeCombiners (acc1: (Float, Int), acc2: (Float, Int)) 这个是将多个分区中的zhangsan的数据进行合并, 我们这里zhansan在同一个分区,这个地方就没有用上


    contRdd.combineByKey(
    (score:(String,Long)) => Map(score._1 -> score._2),
    (c:Map[String,Long],score) => (c ++ Map(score._1 -> score._2)),
    (c1:Map[String,Long],c2:Map[String,Long]) => (c1 ++ c2) )

    topk问题:
    https://www.twblogs.net/a/5c602891bd9eee06ef371458/zh-cn

    5 控制操作

    5.1. cache,persist
    cache底层是调用了persist。
    5.2. checkpoint

    6 例子:

    6.1 join或者其他消耗较大的处理前先进行聚合操作

    join通常是你在使用Spark时最昂贵的操作,需要在join之前应尽可能的先缩小你的数据。

    假设,你有一个RDD存着(熊猫id,分数),另外一个RDD存着(熊猫id,邮箱地址)。若你想给每只可爱的熊猫的邮箱发送她所得的最高的分数,你可以将RDD根据id进行join,然后计算最高的分数,如下:

    def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
    addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
        val joinedRDD = scoreRDD.join(addressRDD)
        joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
    }
    

    然而,这可能不会比先减少分数数据的方案快。先计算最高的分数,那么每个熊猫的分数数据就只有一行,接下来再join地址数据:

    def joinScoresWithAddress2( scoreRDD : RDD[(Long, Double)],
    addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
        val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
        bestScoreData.join(addressRDD)
    }
    

    若每个熊猫有1000个不同的分数,那么这种做法的shuffle量就比上一种的小了1000倍。

    Reference:

    https://blog.csdn.net/weixin_42181200/article/details/81201864
    【Spark快速大数据分析】
    【Spark SQL入门与实践指南】

    相关文章

      网友评论

          本文标题:RDD基本操作

          本文链接:https://www.haomeiwen.com/subject/nbazuftx.html