RDD

作者: 木山手札 | 来源:发表于2019-11-17 12:12 被阅读0次

    transformations

    基础操作

    • def map[U: ClassTag](f: T => U): RDD[U]
    • 执行map()会遍历RDD中的每一个数据想,并依次调用f函数,生成新的rdd集合
        val rdd1: RDD[Int] = sc.parallelize(1 to 10)
        // rdd1中的每个元素调用f函数,生成新的rdd集合
        val rdd2: RDD[Int] = rdd1.map(_ * 2)
        println(rdd2.collect().toList)
    
    • def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
    • 先遍历分区,然后通过Iterator遍历分区内的每个元素(一个分区是一个Iterator),最后返回一个Iterator
    • preservesPartitioning是否保留rdd分区信息,默认false
        val rdd1: RDD[Int] = sc.parallelize(1 to 10,2)
        val rdd2: RDD[Int] = rdd1.mapPartitions(iter => {
          val arr = ArrayBuffer[Int]()
          while (iter.hasNext) {
            val item: Int = iter.next()
            arr+=item*2
          }
          arr.iterator
        })
        println(rdd2.collect().toList)
    
    • def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
    • 与mapPartitions不同的是f: (Int, Iterator[T])定义中多个Int类型的参数,该参数为分区对应的分区号
        val rdd1: RDD[Int] = sc.parallelize(1 to 10,2)
        val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index,iter) => {
          val arr = ArrayBuffer[String]()
          while (iter.hasNext) {
            val item: Int = iter.next()
            arr+=item+"_"+index
          }
          arr.iterator
        })
        println(rdd2.collect().toList)
    
    • def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
    • f函数返回的是TraversableOnce集合,flatMap将集合中的每一个元素拆分出来存放到新的rdd中
        val rdd1: RDD[String] = sc.parallelize(Array("a,b,c","A,B,C","D,e,F"))
        val rdd2: RDD[String] = rdd1.flatMap(_.split(","))
        println(rdd2.collect().toList)
    
    • def filter(f: T => Boolean): RDD[T]
    • 将rdd中的每一个元素应用f函数,如果返回true,被添加到新的rdd中
        val rdd1: RDD[Int] = sc.parallelize(1 to 10)
        val rdd2: RDD[Int] = rdd1.filter(n=>n%2==0)
        println(rdd2.collect().toList)
    
    • def distinct(): RDD[T]
    • def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    • 对元素去重,放入新的rdd中
    • 默认distinct()会生成与原rdd分区个数一致的分区数
    • 对map+reduceByKey的封装map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
        val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,2,2,2,2,3))
        val rdd2: RDD[Int] = rdd1.distinct()
        println(rdd2.collect().toList)
    
    • def union(other: RDD[T]): RDD[T]
    • 并集,两个rdd都所有的元素
    • 不去重
        val rdd1: RDD[Int] = sc.parallelize(1 to 10)
        val rdd2:RDD[Int] = sc.parallelize(1 to 20)
        // List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
        println(rdd1.union(rdd2).collect().toList)
    
    • def intersection(other: RDD[T]): RDD[T]
    • 交集,两个rdd中共有的元素
    • 去重
        val rdd1: RDD[Int] = sc.parallelize(1 to 10)
        val rdd2:RDD[Int] = sc.parallelize(1 to 20)
        // List(8, 1, 9, 10, 2, 3, 4, 5, 6, 7) 无序
        println(rdd1.intersection(rdd2).collect().toList)
    
    • def subtract(other: RDD[T]): RDD[T]
    • 差集,在rdd1中存在,但在rdd2中不存在的元素
    • 不去重
        val rdd1: RDD[Int] = sc.parallelize(1 to 10)
        val rdd2:RDD[Int] = sc.parallelize(1 to 20)
        // List(16, 17, 18, 19, 11, 20, 12, 13, 14, 15) 无序
        println(rdd2.subtract(rdd1).collect().toList)
    
    • def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null): RDD[T]
    • 将分区数较多的rdd转换成分区数较少的rdd
    • shuffle: Boolean = false时,如果分区数大于原始分区数,coalesce函数不会起作用
    • 较多分区数聚合为较少分区数不必经过shuffle,较少分区数扩展较多分区数必须经过shuffle
        val rdd1: RDD[Int] = sc.parallelize(1 to 10,5)
        val rdd2: RDD[Int] = rdd1.coalesce(2)
        val rdd2: RDD[Int] = rdd1.coalesce(10) // 不会起作用
        println(rdd1.partitions.size)
        println(rdd2.partitions.size)
    
    • def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    • 内部实现coalesce(numPartitions, shuffle = true),都要经过shuffle
        val rdd1: RDD[Int] = sc.parallelize(1 to 10,5)
        val rdd2: RDD[Int] = rdd1.repartition(10)
        println(rdd1.partitions.size)
        println(rdd2.partitions.size)
    
    • def randomSplit(weights: Array[Double],seed: Long = Utils.random.nextLong): Array[RDD[T]]
    • 根据weights对一个rdd进行拆分,比如Array(1,4,5),会拆分成3个rdd,元素比例接近为1:4:5,比例之和最好为1
        val rdd1: RDD[Int] = sc.parallelize(1 to 10)
        val rdd2: Array[RDD[Int]] = rdd1.randomSplit(Array(1,8,1))
        println(rdd2(0).collect().toList)
        println(rdd2(1).collect().toList)
        println(rdd2(2).collect().toList)
    
    • def glom(): RDD[Array[T]]
    • 将rdd中每一个分区变成一个数组,放在新的rdd中
        val rdd1: RDD[Int] = sc.parallelize(1 to 10,5)
        val rdd2: RDD[Array[Int]] = rdd1.glom()
        println(rdd2.collect().toList)
    
    • def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
    • 将两个rdd中的元素以k-v形式进行合并,key为rdd1(0),value为rdd2(0)
    • 两个rdd中的元素个数和分区数必须完全一样
        val rdd1: RDD[Int] = sc.parallelize(1 to 5,2)
        val rdd2: RDD[String] = sc.parallelize(Array("A","B","C","D","E"),2)
        val rdd3: RDD[(Int, String)] = rdd1.zip(rdd2)
        println(rdd3.collect().toList)
    
    • def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
    • f: (Iterator[T], Iterator[B])定义如何对每一个分区中的元素进行zip
    • 分区数必须一致,元素个数和元素类型可以不一致,迭代逻辑处理这些不一致的问题
        val rdd1: RDD[Int] = sc.parallelize(1 to 5,2)
        val rdd2: RDD[String] = sc.parallelize(Array("A","B","C","D","E"),2)
        val rdd3: RDD[(Int, String)] = rdd1.zipPartitions(rdd2)((r1, r2) => {
          var list = List[(Int, String)]()
          while (r1.hasNext && r2.hasNext) {
            list ::= (r1.next(), r2.next())
          }
          list.iterator
        })
        println(rdd3.collect().toList)
    
    • def zipWithIndex(): RDD[(T, Long)]
    • 将rdd中的元素与该元素在rdd中的索引进行合并
    • def zipWithUniqueId(): RDD[(T, Long)]
    • 将rdd中的元素与该元素对应的唯一id进行zip操作
    • 唯一id生成规则xn+k,n=总分区数,k=元素在rdd中的分区号,x=从0开始自然递增
        val rdd1: RDD[String] = sc.parallelize(Array("A","B","C","D"),2)
        val rdd2: RDD[(String, Long)] = rdd1.zipWithIndex()
        val rdd3: RDD[(String, Long)] = rdd1.zipWithUniqueId()
        println(rdd2.collect().toList)
        println(rdd3.collect().toList)
    

    相关文章

      网友评论

          本文标题:RDD

          本文链接:https://www.haomeiwen.com/subject/zvfpictx.html