美文网首页Spark
黑猴子的家:Spark RDD 转换算子 Transformat

黑猴子的家:Spark RDD 转换算子 Transformat

作者: 黑猴子的家 | 来源:发表于2019-06-08 08:12 被阅读0次

    RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行

    常用的RDD 转换算子Transformation

    1、Map (一对一)

    (1)原理
    返回一个新的RDD,该RDD由每一个输入元素,经过f函数转换后组成,对于每一条数据都进行单值的转换。

    (2)源代码

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }
    

    (3)案列

    scala> val rddmap = sc.makeRDD(List((1,(1,2)),(2,(2,3)),(3,(3,4)),(4,(6,7))),4)
    rddmap: org.apache.spark.rdd.RDD[(Int, (Int, Int))] 
                           = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> rddmap.map((a) => (a._2._2,(a._2._1),a._1)).collect
    res0: Array[(Int, Int, Int)] = Array((2,1,1), (3,2,2), (4,3,3), (7,6,4))        
    
    scala> rddmap.map{ case (a,(b,c)) => (c,(b,a)) }
    res1: org.apache.spark.rdd.RDD[(Int, (Int, Int))] 
                           = MapPartitionsRDD[2] at map at <console>:27
    
    scala> rddmap.map{ case (a,(b,c)) => (c,(b,a)) }.collect
    res2: Array[(Int, (Int, Int))] = Array((2,(1,1)), (3,(2,2)), (4,(3,3)), (7,(6,4)))
    
    scala> val rddmap2 = sc.parallelize(1 to 10)
    rddmap2: org.apache.spark.rdd.RDD[Int] 
                           = ParallelCollectionRDD[4] at parallelize at <console>:24
    
    scala> rddmap2.collect
    res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> val rddmapt = rddmap2.map(_ * 2)
    rddmapt: org.apache.spark.rdd.RDD[Int] 
                           = MapPartitionsRDD[5] at map at <console>:26
    
    scala> rddmapt.collect
    res4: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
    

    2、filter 过滤

    (1)原理
    返回一个新的RDD,该RDD由经过f函数计算后返回值为true的输入元素组成,过滤数据集

    (2)源代码

    def filter(f: T => Boolean): RDD[T] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
    }
    

    (3)案列

    scala> val rddList = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
    rddList: org.apache.spark.rdd.RDD[(Int, (Int, Int))] 
                       = ParallelCollectionRDD[6] at makeRDD at <console>:24
    
    scala> rddList.filter{ case (a,(b,c)) => c%2 != 0 }.collect
    res9: Array[(Int, (Int, Int))] = Array((2,(22,23)))
    
    scala> rddList.collect{ case (a,(b,c)) if c%2 !=0  => (a,(b,c)) }.collect
    res10: Array[(Int, (Int, Int))] = Array((2,(22,23)))
    
    scala> val rddfilter = sc.parallelize(Array("xiaoming","xiaohong","xiaohe","hadoop"))
    rddfilter: org.apache.spark.rdd.RDD[String] 
                       = ParallelCollectionRDD[12] at parallelize at <console>:24
    
    scala> val rddf = rddfilter.filter(_ contains "xiao")
    rddf: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:26
    
    scala> rddfilter.collect
    res11: Array[String] = Array(xiaoming, xiaohong, xiaohe, hadoop)
    
    scala> rddf.collect
    res12 Array[String] = Array(xiaoming, xiaohong, xiaohe)
    

    3、flatMap 压扁压平

    (1)原理
    对于每一条数据进行单值到多值的转换,类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以f函数应该返回一个序列,而不是单一元素)

    (2)源代码

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }
    

    对每一条数据进行,单值到多值的转换
    flatMap:flat即压扁,压平,扁平化
    效果就是将集合中的每个元素的子元素映射到某个函数并返回新的集合

    (3)案列

    //flatmap 压扁压平, 一对多
    scala> val rddflatmap = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
    rddflatmap: org.apache.spark.rdd.RDD[(Int, (Int, Int))] 
                     = ParallelCollectionRDD[20] at makeRDD at <console>:24
    
    scala> rddflatmap.flatMap{ case (a,(b,c)) => List(a,b,c) }.collect
    res16: Array[Int] = Array(1, 11, 12, 2, 22, 23, 3, 33, 34, 4, 46, 48)
    
    scala> val sourcefm = sc.makeRDD(1 to 5)
    sourcefm: org.apache.spark.rdd.RDD[Int] 
                     = ParallelCollectionRDD[22] at makeRDD at <console>:24
    
    scala> sourcefm.collect
    res17: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> val flatmaprdd = sourcefm.flatMap(x => (1 to x))
    flatmaprdd: org.apache.spark.rdd.RDD[Int] 
                     = MapPartitionsRDD[23] at flatMap at <console>:26
    
    scala> flatmaprdd.collect
    res18: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
    

    4、mapPartitions

    (1)原理
    对于每一个分区的数据进行转换,注意函数形式,每一个分区运行一次函数。类似于map,但独立在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区

    (2)源代码

    def mapPartitions[U: ClassTag](
        f: Iterator[T] => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(
        this,
        (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
        preservesPartitioning)
    }
    

    mapPartitions 是每个分区运行,相对来说,当面对大数据集的时候,比 map 速度快很多
    对于每一个分区的数据进行转换,注意函数的形式,每一个分区,运行一次函数

    (3)案列

    scala> val smprdd = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
    smprdd: org.apache.spark.rdd.RDD[(Int, (Int, Int))] 
                         = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> smprdd.mapPartitions( items => items.map{ case (a,(b,c)) => (c,(b,a)) } ).collect
    res0: Array[(Int, (Int, Int))] 
                         = Array((12,(11,1)), (23,(22,2)), (34,(33,3)), (48,(46,4)))
    
    scala> val smprdd2 = sc.parallelize(List( \
    ("kpop","female"),("zorro","male"), \
    ("mobin","male"),("lucy","female")),2)
    smprdd2: org.apache.spark.rdd.RDD[(String, String)] 
                         = ParallelCollectionRDD[2] at parallelize at <console>:24
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = {
      var woman = List[String]()
      while (iter.hasNext){
        val next = iter.next()
        next match {
           case (_,"female") => woman = next._1 :: woman
           case _ =>
        }
      }
      woman.iterator
    }
    
    // Exiting paste mode, now interpreting. //Ctrl+D 退出
    
    partitionsFun: (iter: Iterator[(String, String)])Iterator[String]
    
    scala> val result = smprdd2.mapPartitions(partitionsFun)
    result: org.apache.spark.rdd.RDD[String] 
                         = MapPartitionsRDD[3] at mapPartitions at <console>:28
    
    scala> result.collect
    res1: Array[String] = Array(kpop, lucy)
    

    5、mapPartitionsWithIndex (分区+分区索引)

    (1)原理
    对于每一个分区的数据进行转换,注意函数形式,多了一个分区的索引,每一个分区运行一次函数,类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

    (2)源代码

    def mapPartitionsWithIndex[U: ClassTag](
        f: (Int, Iterator[T]) => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(
        this,
        (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
        preservesPartitioning)
    }
    

    (3)案列

    scala> val rdd = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
    rdd: org.apache.spark.rdd.RDD[(Int, (Int, Int))] 
                    = ParallelCollectionRDD[4] at makeRDD at <console>:24
    
    scala> rdd.repartition(2)
    res2: org.apache.spark.rdd.RDD[(Int, (Int, Int))] 
                    = MapPartitionsRDD[8] at repartition at <console>:27
    
    scala> rdd.partitioner
    res3: Option[org.apache.spark.Partitioner] = None
    
    scala> rdd.mapPartitionsWithIndex( \
    (a,items) => Iterator(a+":" +items.map( \
    t => (" "+ t._1 + "("+t._2._1 +","+t._2._2+"))")).mkString("|") ) ).collect
    res5: Array[String] = Array(0: 1(11,12)), 1: 2(22,23)), 2: 3(33,34)), 3: 4(46,48)))
    
    scala> val rdd = sc.parallelize(List( \
    ("kpop","female"),("zorro","male"),\
    ("mobin","male"),("lucy","female")))
    rdd: org.apache.spark.rdd.RDD[(String, String)] 
                    = ParallelCollectionRDD[12] at parallelize at <console>:24
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    def partitionsFun(index : Int, iter : Iterator[(String,String)]) : Iterator[String] = {
      var woman = List[String]()
      while (iter.hasNext){
        val next = iter.next()
        next match {
           case (_,"female") => woman = "["+index+"]"+next._1 :: woman
           case _ =>
        }
      }
      woman.iterator
    }
    
    // Exiting paste mode, now interpreting.
    
    partitionsFun: (index: Int, iter: Iterator[(String, String)])Iterator[String]
    
    scala> val result = rdd.mapPartitionsWithIndex(partitionsFun)
    result: org.apache.spark.rdd.RDD[String] 
                    = MapPartitionsRDD[13] at mapPartitionsWithIndex at <console>:28
    
    scala> result.collect
    res7: Array[String] = Array([0]kpop, [1]lucy)
    

    6、sample (随机抽样)

    (1)原理
    对RDD数据进行抽样,返回一个RDD,以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值)

    (2)源代码

    def sample(
        withReplacement: Boolean,
        fraction: Double,
        seed: Long = Utils.random.nextLong): RDD[T] = {
        require(fraction >= 0,
        s"Fraction must be nonnegative, but got ${fraction}")
    
      withScope {
        require(fraction >= 0.0, "Negative fraction value: " + fraction)
        if (withReplacement) {
          new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
        } else {
          new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
        }
      }
    }   
    

    (3)案列

    scala> val rdd = sc.parallelize(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] 
                        = ParallelCollectionRDD[14] at parallelize at <console>:24
    
    scala> rdd.collect()
    res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)                         
    
    scala> var sample1 = rdd.sample(true,0.4,2)
    sample1: org.apache.spark.rdd.RDD[Int] 
                        = PartitionwiseSampledRDD[15] at sample at <console>:26
    
    scala> sample1.collect()
    res9: Array[Int] = Array(1, 2, 2)
    
    scala> var sample2 = rdd.sample(false,0.2,3)
    sample2: org.apache.spark.rdd.RDD[Int] 
                        = PartitionwiseSampledRDD[16] at sample at <console>:26
    
    scala> sample2.collect()
    res11: Array[Int] = Array(1, 9)
    
    scala> val smrdd = sc.makeRDD(0 to 100)
    smrdd: org.apache.spark.rdd.RDD[Int] 
                        = ParallelCollectionRDD[17] at makeRDD at <console>:24
    
    scala> smrdd.sample(false,0.3,2).collect
    res12: Array[Int] = Array(0, 2, 6, 7, 9, 10, 11, 15,
     16, 17, 18, 23, 24, 27, 28, 31, 32, 45, 49, 59,
     63, 64, 68, 69, 73, 75, 76, 79, 80, 83, 86, 
    89, 90, 94, 97)
    
    scala> smrdd.collect
    res13: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 
    12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26,
     27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41,
     42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
     57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
     72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86,
     87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
    

    7、union (合并)

    (1)原理
    一个RDD和另一个RDD进行数据合并,注意类型要相同,对源RDD和参数RDD求并集后返回一个新的RDD

    (2)源代码

    def union(other: RDD[T]): RDD[T] = withScope {
      sc.union(this, other)
    }
    

    (3)案列

    scala> val ardd = sc.makeRDD(0 to 10)
    ardd: org.apache.spark.rdd.RDD[Int] 
                     = ParallelCollectionRDD[19] at makeRDD at <console>:24
    
    scala> val brdd = sc.makeRDD(11 to 20)
    brdd: org.apache.spark.rdd.RDD[Int] 
                     = ParallelCollectionRDD[20] at makeRDD at <console>:24
    
    scala> (ardd union brdd) collect
    warning: there was one feature warning; re-run with -feature for details
    res14: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 
    10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    

    8、intersection (求交集)

    (1)原理
    求一个RDD和另外一个RDD的交集,注意类型要一样,对源RDD和参数RDD求交集后返回一个新的RDD

    (2)源代码

    def intersection(other: RDD[T]): RDD[T] = withScope {
     this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
      .filter {case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty}
      .keys
    }
    

    (3)案列

    scala> val rdd1 = sc.parallelize(1 to 7)
    rdd1: org.apache.spark.rdd.RDD[Int] 
                = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val rdd2 = sc.makeRDD(4 to 10)
    rdd2: org.apache.spark.rdd.RDD[Int] 
                = ParallelCollectionRDD[1] at makeRDD at <console>:24
    
    scala> val rdd3 = rdd1 intersection rdd2
    rdd3: org.apache.spark.rdd.RDD[Int] 
                = MapPartitionsRDD[7] at intersection at <console>:28
    
    scala> rdd3 collect
    warning: there was one feature warning; re-run with -feature for details
    res0: Array[Int] = Array(4, 6, 7, 5)
    

    9、distinct (数据去重)

    (1)原理
    对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

    (2)源代码

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
    }
    

    (3)案列

    scala> val disrdd = sc.parallelize(List(1,1,2,3,3,4,5,5,6,7,8))
    disrdd: org.apache.spark.rdd.RDD[Int] 
                    = ParallelCollectionRDD[8] at parallelize at <console>:24
    
    scala> val urdd = disrdd.distinct
    urdd: org.apache.spark.rdd.RDD[Int] 
                    = MapPartitionsRDD[14] at distinct at <console>:26
    
    scala> urdd.collect()
    res1: Array[Int] = Array(4, 6, 8, 2, 1, 3, 7, 5)
    
    scala> val urdd2 = disrdd distinct 5
    urdd2: org.apache.spark.rdd.RDD[Int] 
                    = MapPartitionsRDD[20] at distinct at <console>:26
    
    scala> urdd2.collect
    res3: Array[Int] = Array(5, 1, 6, 7, 2, 3, 8, 4)
    

    10、partitionBy (重新分区)

    (1)原理
    对一个RDD进行重新分区,对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,
    否则会生成ShuffleRDD.

    (2)源代码

    def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
      if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
      if (self.partitioner == Some(partitioner)) {
        self
      } else {
        new ShuffledRDD[K, V, V](self, partitioner)
      }
    }
    

    (3)案列一

    scala> val prdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
    prdd: org.apache.spark.rdd.RDD[(Int, String)] 
                       = ParallelCollectionRDD[21] at parallelize at <console>:24
    
    scala> prdd.partitions.size
    res4: Int = 4
    
    scala> import org.apache.spark._
    import org.apache.spark._
    
    scala> var rdd2 = prdd.partitionBy(new HashPartitioner(2))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] 
                       = ShuffledRDD[22] at partitionBy at <console>:29
    
    scala> rdd2.partitions.size
    res5: Int = 2
    

    (4)案列二

    scala> val a = sc.makeRDD(1 to 100,5)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at makeRDD at <console>:24
    
    scala> a.partitioner
    res57: Option[org.apache.spark.Partitioner] = None
    
    scala> a.partitions.size
    res61: Int = 5
    
    scala> a.map((_,3)).partitionBy(new org.apache.spark.HashPartitioner(2))
    res62: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[58] at partitionBy at <console>:27
    
    scala> a.partitions.size
    res63: Int = 5
    
    scala> res62.partitions.size
    res64: Int = 2
    

    11、reduceByKey (聚合操作)

    (1)原理
    对于相同key的数据集进行规约操作。在map阶段预聚合,减少shuffle数量,在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置

    (2)源代码

    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
      reduceByKey(new HashPartitioner(numPartitions), func)
    }
    

    (3)案列

    scala> val rdd = sc.makeRDD(List(("female",5),("male",6),("male",7),("female",8)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] 
                     = ParallelCollectionRDD[23] at makeRDD at <console>:27
    
    scala> val reducerdd = rdd.reduceByKey
    reduceByKey   reduceByKeyLocally
    
    scala> val reducerdd = rdd.reduceByKey( (x,y) => x + y )
    reducerdd: org.apache.spark.rdd.RDD[(String, Int)] 
                     = ShuffledRDD[24] at reduceByKey at <console>:29
    
    scala> reducerdd.collect
    res7: Array[(String, Int)] = Array((female,13), (male,13))
    

    12、groupByKey (分组聚合)

    (1)原理
    对于相同key进行聚集,groupByKey也是对每个key进行操作,但只生成一个sequence

    (2)源代码

    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
      groupByKey(new HashPartitioner(numPartitions))
    }
    

    (3)案列

    scala> val words = Array("one","two","two","three","three","three")
    words: Array[String] = Array(one, two, two, three, three, three)
    
    scala> val wprdd = sc.makeRDD(words).map( w => (w,1) )
    wprdd: org.apache.spark.rdd.RDD[(String, Int)] 
                             = MapPartitionsRDD[26] at map at <console>:29
    
    scala> wprdd.collect
    res9: Array[(String, Int)] 
             = Array((one,1), (two,1), (two,1), (three,1), (three,1), (three,1))
    
    scala> val grouprdd = wprdd.groupByKey
    grouprdd: org.apache.spark.rdd.RDD[(String, Iterable[Int])] 
                             = ShuffledRDD[28] at groupByKey at <console>:31
    
    scala> grouprdd.collect
    res11: Array[(String, Iterable[Int])] 
      = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
    
    scala> val maprdd = grouprdd.map( t => (t._1,t._2.sum))
    maprdd: org.apache.spark.rdd.RDD[(String, Int)] 
                             = MapPartitionsRDD[29] at map at <console>:33
    
    scala> maprdd.collect
    res12: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
    

    13、combineByKey

    (1)原理
    对相同K,把V合并成一个集合.
    createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
    那个键对应的累加器的初始值
    mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
    mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

    (2)源代码

    def combineByKey[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
          combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
    }
    

    (3)案列

    scala> val scores = Array( \
    ("Fred", 88), ("Fred", 95), ("Fred", 91), \
    ("Wilma", 93), ("Wilma", 95), ("Wilma", 98))
    scores: Array[(String, Int)] 
            = Array((Fred,88), (Fred,95), (Fred,91), (Wilma,93), (Wilma,95), (Wilma,98))
    
    scala> val input = sc.parallelize(scores)
    input: org.apache.spark.rdd.RDD[(String, Int)] 
                   = ParallelCollectionRDD[31] at parallelize at <console>:29
    
    scala> val combine = input.combineByKey( \
    (v)=>(v,1), \
    (acc:(Int,Int),v)=>(acc._1+v,acc._2+1), \
    (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))
    combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] 
                          = ShuffledRDD[32] at combineByKey at <console>:31
    
    scala> combine.collect
    res14: Array[(String, (Int, Int))] = Array((Wilma,(286,3)), (Fred,(274,3)))
    
    scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)}
    result: org.apache.spark.rdd.RDD[(String, Double)] 
                            = MapPartitionsRDD[33] at map at <console>:33
    
    scala> result.collect
    res15: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))
    

    14、aggregateByKey

    (1)原理
    在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

    seqOp函数用于在每一个分区中用初始值逐步迭代value,combOp函数用于合并每个分区中的结果


    (2)源代码

    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
        combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
      aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] 
                    = ParallelCollectionRDD[39] at parallelize at <console>:27
    
    scala> val aggrdd = rdd.aggregateByKey(0)(math.max(_,_),_+_)
    aggrdd: org.apache.spark.rdd.RDD[(Int, Int)] 
                    = ShuffledRDD[40] at aggregateByKey at <console>:29
    
    scala> aggrdd.collect
    res19: Array[(Int, Int)] = Array((3,8), (1,7), (2,3))
    
    scala> aggrdd.partitions.size
    res20: Int = 3
    
    scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),1)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] 
                    = ParallelCollectionRDD[41] at parallelize at <console>:27
    
    scala> val aggrdd = rdd.aggregateByKey(0)(math.max(_,_),_+_)
    aggrdd: org.apache.spark.rdd.RDD[(Int, Int)] 
                    = ShuffledRDD[42] at aggregateByKey at <console>:29
    
    scala> aggrdd.collect
    res21: Array[(Int, Int)] = Array((1,4), (3,8), (2,3))
    

    15、foldByKey(折叠)

    (1)原理
    foldByKey 是aggregateByKey的简化操作,seqop和combop相同

    (2)源代码

    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
      foldByKey(zeroValue, defaultPartitioner(self))(func)
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] 
                   = ParallelCollectionRDD[43] at parallelize at <console>:27
    
    scala> val frdd = rdd.foldByKey(0)(_+_)
    frdd: org.apache.spark.rdd.RDD[(Int, Int)] 
                   = ShuffledRDD[44] at foldByKey at <console>:29
    
    scala> frdd.collect
    res22: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
    

    16、sortByKey (根据k排序)

    (1)原理
    根据K来进行排序,在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

    (2)源代码

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
        : RDD[(K, V)] = self.withScope {
          val part = new RangePartitioner(numPartitions, self, ascending)
          new ShuffledRDD[K, V, V](self, part)
          .setKeyOrdering(if (ascending) ordering else ordering.reverse)
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] 
              = ParallelCollectionRDD[45] at parallelize at <console>:27
    
    scala> rdd.sortByKey(true).collect()
    res23: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
    
    scala> rdd.sortByKey(false).collect()
    res24: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
    

    17、sortBy (通过function 函数排序)

    (1)原理
    能够通过f来产生K,这个k必须要有Ordering隐式参数。与sortByKey类似,但是更灵活,可以用func先对数据进行处理,按照处理后的数据比较结果排序。

    (2)源代码

    def sortBy[K](
        f: (T) => K,
        ascending: Boolean = true,
        numPartitions: Int = this.partitions.length)
        (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
           this.keyBy[K](f)
          .sortByKey(ascending, numPartitions)
          .values
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(List(6,9,7,1,2,4,3,5))
    rdd: org.apache.spark.rdd.RDD[Int] 
              = ParallelCollectionRDD[64] at parallelize at <console>:27
    
    scala> rdd.sortBy(x => x).collect
    res27: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 9)
    
    scala> rdd.sortBy(x => x%3).collect
    res28: Array[Int] = Array(6, 9, 3, 7, 1, 4, 2, 5)
    

    18、join (key的交集,仅 join 相同的)

    (1)原理
    将两个RDD进行JOIN,只连接相同的Key,在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

    (2)源代码

    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
      join(other, defaultPartitioner(self, other))
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] 
             = ParallelCollectionRDD[75] at parallelize at <console>:27
    
    scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] 
               = ParallelCollectionRDD[76] at parallelize at <console>:27
    
    scala> rdd.join(rdd1).collect
    res29: Array[(Int, (String, Int))] = Array((2,(b,5)), (1,(a,4)), (3,(c,6)))
    

    19、cogroup (将两个RDD groupbykey之后合并(分组聚合))

    (1)原理
    将两个RDD groupbykey之后合并,在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

    (2)源代码

    def cogroup[W](
        other: RDD[(K, W)],
        numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
        cogroup(other, new HashPartitioner(numPartitions))
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] 
                 = ParallelCollectionRDD[80] at parallelize at <console>:27
    
    scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] 
                 = ParallelCollectionRDD[81] at parallelize at <console>:27
    
    scala> rdd.cogroup(rdd1).collect()
    res30: Array[(Int, (Iterable[String], Iterable[Int]))] 
                 = Array(
                 (2,(CompactBuffer(b),CompactBuffer(5))), 
                 (1,(CompactBuffer(a),CompactBuffer(4))), 
                 (3,(CompactBuffer(c),CompactBuffer(6))))
    
    scala> val rdd2 = sc.parallelize(Array((4,4),(2,5),(3,6)))
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] 
                 = ParallelCollectionRDD[84] at parallelize at <console>:27
    
    scala> rdd.cogroup(rdd2).collect()
    res31: Array[(Int, (Iterable[String], Iterable[Int]))] 
                 = Array(
                 (4,(CompactBuffer(),CompactBuffer(4))),
                 (2,(CompactBuffer(b),CompactBuffer(5))),
                 (1,(CompactBuffer(a),CompactBuffer())), 
                 (3,(CompactBuffer(c),CompactBuffer(6))))
    
    scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
    rdd3: org.apache.spark.rdd.RDD[(Int, String)] 
                 = ParallelCollectionRDD[87] at parallelize at <console>:27
    
    scala> rdd3.cogroup(rdd2).collect()
    res32: Array[(Int, (Iterable[String], Iterable[Int]))] 
                 = Array(
                 (4,(CompactBuffer(),CompactBuffer(4))), 
                 (2,(CompactBuffer(b),CompactBuffer(5))), 
                 (1,(CompactBuffer(a, d),CompactBuffer())), 
                 (3,(CompactBuffer(c),CompactBuffer(6))))
    

    20、cartesian (笛卡尔积)

    (1)原理
    笛卡尔积

    (2)源代码

    def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
      new CartesianRDD(sc, this, other)
    }
    

    (3)案列

    scala> val rdd1 = sc.parallelize(1 to 3)
    rdd1: org.apache.spark.rdd.RDD[Int] 
               = ParallelCollectionRDD[90] at parallelize at <console>:27
    
    scala> val rdd2 = sc.parallelize(2 to 5)
    rdd2: org.apache.spark.rdd.RDD[Int] 
               = ParallelCollectionRDD[91] at parallelize at <console>:27
    
    scala> rdd1.cartesian(rdd2).collect()
    res33: Array[(Int, Int)] = Array(
               (1,2), (1,3), (1,4), (1,5), (2,2), (2,3), 
               (3,2), (3,3), (2,4), (2,5), (3,4), (3,5))
    

    21、pipe ( RDD每个分区都执行shell脚本)

    (1)原理
    对于每个分区,都执行一个perl或者shell脚本,脚本需要每个节点都能够访问返回输出的RDD

    (2)源代码

    def pipe(command: String): RDD[String] = withScope {
      // Similar to Runtime.exec(), if we are given a single string, split it into words
      // using a standard StringTokenizer (i.e. by spaces)
      pipe(PipedRDD.tokenize(command))
    }
    

    (3)脚本

    [root@hadoop102 ~]# cd /opt/module/spark
    [root@hadoop102 spark]# touch pipe.sh
    [root@hadoop102 spark]# chmod 755 pipe.sh
    [root@hadoop102 spark]# vim pipe.sh
    #!/bin/sh
    echo "AA"
    while read LINE; do
       echo ">>>"${LINE}
    done
    

    (4)案列

    scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
    rdd: org.apache.spark.rdd.RDD[String] 
           = ParallelCollectionRDD[94] at parallelize at <console>:27
    
    scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
    res35: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
    
    scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
    rdd: org.apache.spark.rdd.RDD[String] 
              = ParallelCollectionRDD[96] at parallelize at <console>:27
    
    scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
    res37: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)
    

    22、coalesce (缩减分区数)

    (1)原理
    缩减分区数,用于大数据集过滤后,提高小数据集的执行效率

    (2)源代码

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
                 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                (implicit ord: Ordering[T] = null)
        : RDD[T] = withScope {
      require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
      if (shuffle) {
        /** Distributes elements evenly across output partitions, starting from a random partition. */
        val distributePartition = (index: Int, items: Iterator[T]) => {
          var position = (new Random(index)).nextInt(numPartitions)
          items.map { t =>
            // Note that the hash code of the key will just be the key itself. The HashPartitioner
            // will mod it with the number of total partitions.
            position = position + 1
            (position, t)
          }
        } : Iterator[(Int, T)]
    
        // include a shuffle step so that our upstream tasks are still distributed
        new CoalescedRDD(
          new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
          new HashPartitioner(numPartitions)),
          numPartitions,
          partitionCoalescer).values
      } else {
        new CoalescedRDD(this, numPartitions, partitionCoalescer)
      }
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] 
              = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> rdd.partitions.size
    res1: Int = 4
    
    scala> val csrdd = rdd.coalesce(3)
    csrdd: org.apache.spark.rdd.RDD[Int] 
                  = CoalescedRDD[1] at coalesce at <console>:26
    
    scala> rdd.partitions.size
    res2: Int = 4
    
    scala> csrdd.partitions.size
    res3: Int = 3
    

    23、repartition (重新分区,所有数据重新洗牌)

    (1)原理
    重新分区,根据分区数,重新通过网络随机洗牌所有数据。

    (2)源代码

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      coalesce(numPartitions, shuffle = true)
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] 
                 = ParallelCollectionRDD[2] at parallelize at <console>:24
    
    scala> rdd.partitions.size
    res4: Int = 4
    
    scala> val rerdd = rdd.repartition(2)
    rerdd: org.apache.spark.rdd.RDD[Int] 
                 = MapPartitionsRDD[6] at repartition at <console>:26
    
    scala> rerdd.partitions.size
    res5: Int = 2
    
    scala> val rerdd = rdd.repartition(4)
    rerdd: org.apache.spark.rdd.RDD[Int] 
                 = MapPartitionsRDD[10] at repartition at <console>:26
    
    scala> rerdd.partitions.size
    res6: Int = 4
    
    scala> rerdd.collect
    res7: Array[Int] = Array(2, 6, 10, 14, 3, 7, 11, 15, 4, 8, 12, 16, 1, 5, 9, 13)
    

    24、repartitionAndSortWithinPartitions (重新分区+ 排序)

    (1)原理
    重新分区和排序,从源码中可以看出,该方法依据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序;通过对比sortByKey发现,这种方式比先分区,然后在每个分区中进行排序效率高,这是因为它可以将排序融入到shuffle,repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。

    (2)源代码

    def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] 
                                                                       = self.withScope {
      new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
    }
    

    (3)案列

    public class Test {
        public static void main(String[] args) {
            SparkConf wordcount = new SparkConf().setAppName("wordcount").setMaster("local[*]");
            JavaSparkContext javaSparkContext = new JavaSparkContext(wordcount);
            List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
            JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
            final Random random = new Random();
            JavaPairRDD<Integer,Integer> javaPairRDD 
                  = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
                @Override
                public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
                    return new Tuple2<Integer, Integer>(integer,random.nextInt(10));
                }
            });
            JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD 
                       = javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
                @Override
                public int numPartitions() {   return 2; }
                @Override
                public int getPartition(Object key) { 
                         return key.toString().hashCode() % numPartitions();
                }
            });
            System.out.println(RepartitionAndSortWithPartitionsRDD.collect());
        }
    }
    

    25、glom (RDD 每个分区变数组)

    (1)原理
    将RDD的每一个分区的数据组成组数,返回新的RDD值,每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

    (2)源代码

    def glom(): RDD[Array[T]] = withScope {
      new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] 
                   = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    scala> rdd.glom().collect()
    res0: Array[Array[Int]] = Array(
    Array(1, 2, 3, 4), Array(5, 6, 7, 8), 
    Array(9, 10, 11, 12), Array(13, 14, 15, 16))
    

    26、mapValues (针对kv结构,只操作V)

    (1)原理
    操作KV结构RDD的value,或者说针对于(K,V)形式的类型,只对V进行操作

    (2)源代码

    def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
      val cleanF = self.context.clean(f)
      new MapPartitionsRDD[(K, U), (K, V)](self,
        (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
        preservesPartitioning = true)
    }
    

    (3)案列

    scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
    rdd3: org.apache.spark.rdd.RDD[(Int, String)] 
               = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd3.mapValues(_+"|||").collect()
    res1: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
    

    27、subtract (基于rdd1计算RDD差集的函数)

    (1)原理
    输出前面RDD的差集,计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来

    (2)源代码

    def subtract(other: RDD[T]): RDD[T] = withScope {
      subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
    }
    

    (3)案列

    scala> val rdd = sc.parallelize(3 to 8)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
    
    scala> val rdd1 = sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
    scala> rdd.subtract(rdd1).collect()
    res2: Array[Int] = Array(6, 8, 7)
    
    scala> rdd1.subtract(rdd).collect()
    res3: Array[Int] = Array(2, 1)
    

    相关文章

      网友评论

        本文标题:黑猴子的家:Spark RDD 转换算子 Transformat

        本文链接:https://www.haomeiwen.com/subject/vemlxctx.html