美文网首页
[Spark]Spark RDD&算子总结

[Spark]Spark RDD&算子总结

作者: 帅可儿妞 | 来源:发表于2019-09-19 00:44 被阅读0次

    一、RDD概述

    1. 什么是RDD
      • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
    2. RDD属性


      image.png
      1. 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
      2. 一个计算每个分片的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
      3. RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
      4. 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
      5. 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
    3. 创建RDD
      1. 由一个已经存在的Scala集合创建。
        val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
      2. 由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
        val rdd2 = sc.textFile("hdfs://hadoop141:8020/words.txt")
        
      3. 查看该rdd的分区数量,默认是程序所分配的cpu core的数量,也可以在创建的时候指定:rdd1.partitions.length, 创建的时候指定分区数量:val rdd1 = sc.parallelize(Array(1,2,3.4),3)

    二、RDD编程API---包含两种算子

    1. Transformation
      RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
    2. 常用的Transformation操作:
      • map(func):返回一个新的RDD,该RDD由每一个输入的元素经过func函数转换后组成。
      • filter(func):返回一个新的RDD,该RDD由每一个输入的元素经过func函数计算后返回为true的输入元素组成。
      • sortBy(func,[ascending], [numTasks]):返回一个新的RDD,输入元素经过func函数计算后,按照指定的方式进行排序。(默认方式为false,升序;true是降序)
        val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
        val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
        val rdd3 = rdd2.filter(_>10)
        val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
        val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
        
      • flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)。类似于先map,然后再flatten。
        val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j"))
        rdd4.flatMap(_.split(' ')).collect
        ------------------------------------------------------------------
        val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
        rdd5.flatMap(_.flatMap(_.split(" "))).collect
        
      • union:求并集,注意类型要一致
      • intersection:求交集
      • distinct:去重
        val rdd6 = sc.parallelize(List(5,6,4,7))
        val rdd7 = sc.parallelize(List(1,2,3,4))
        val rdd8 = rdd6.union(rdd7)
        rdd8.distinct.sortBy(x=>x).collect
        --------------------------------------------
        val rdd9 = rdd6.intersection(rdd7)
        
      • join、leftOuterJoin、rightOuterJoin
        val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
        val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
        --------------------------------------------------------------------------
        val rdd3 = rdd1.join(rdd2).collect
        rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9)))
        ---------------------------------------------------------------------------
        val rdd3 = rdd1.leftOuterJoin(rdd2).collect
        rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))
        ---------------------------------------------------------------------------
        val rdd3 = rdd1.rightOuterJoin(rdd2).collect
        rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))
        
      • groupByKey([numTasks]):在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD----只针对数据是对偶元组的
        val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
        val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
        val rdd3 = rdd1 union rdd2
        val rdd4 = rdd3.groupByKey.collect
        rdd4: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(8, 1)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(9, 2)))
        -----------------------------------------------------------------------------------
        val rdd5 = rdd4.map(x=>(x._1,x._2.sum))
        rdd5: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
        
      • groupBy:传入一个参数的函数,按照传入的参数为key,返回一个新的RDD[(K, Iterable[T])],value是所有可以相同的传入数据组成的迭代器。以下为源码:
        /**
          * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
          * mapping to that key. The ordering of elements within each group is not guaranteed, and
          * may even differ each time the resulting RDD is evaluated.
          *
          * @note This operation may be very expensive. If you are grouping in order to perform an
          * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
          * or `PairRDDFunctions.reduceByKey` will provide much better performance.
          */
        def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
          groupBy[K](f, defaultPartitioner(this))
        }
        
        具体代码案例:
        scala> val rdd1=sc.parallelize(List(("a",1,2),("b",1,1),("a",4,5)))
        rdd1: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24
        scala> rdd1.groupBy(_._1).collect
        res18: Array[(String, Iterable[(String, Int, Int)])] = Array((a,CompactBuffer((a,1,2), (a,4,5))), (b,CompactBuffer((b,1,1))))
        
      • reduceByKey(func,[numTasks]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置。
        val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
        val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
        val rdd3 = rdd1 union rdd2
        val rdd6 = rdd3.reduceByKey(_+_).collect
        rdd6: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
        
      • cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
        val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
        val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
        val rdd3 = rdd1.cogroup(rdd2).collect
        rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(2, 1),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
        ----------------------------------------------------------------------------------------
        val rdd4 = rdd3.map(x=>(x._1,x._2._1.sum+x._2._2.sum))
        rdd4: Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,2), (kitty,2))
        
      • cartesian(otherDataset )笛卡尔积
        val rdd1 = sc.parallelize(List("tom", "jerry"))
        val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
        val rdd3 = rdd1.cartesian(rdd2).collect
        rdd3: Array[(String, String)] = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))
        
    3. Action


      image.png

    三、RDD编程----高级API

    1. mapPartitions:针对每个分区进行操作,源码如下:要求传入一个Iterator,并且返回一个Iterator
      /**
        * Return a new RDD by applying a function to each partition of this RDD.
        *
        * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
        * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
        */
      def mapPartitions[U: ClassTag](
          f: Iterator[T] => Iterator[U],
          preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(
          this,
          (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
          preservesPartitioning)
      }
      
      mapPartitionsWithIndex:针对每个partition操作,把每个partition中的分区号和对应的值拿出来。是Transformation
      源码:
      /**
      * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
      * of the original partition.
      *
      * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
      * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
      preservesPartitioning表示返回RDD是否留有分区器。仅当RDD为K-V型RDD,且key没有被修饰的情况下,可设为true。非K-V型RDD一般不存在分区器;K-V RDD key被修改后,元素将不再满足分区器的分区要求。这些情况下,须设为false,表示返回的RDD没有被分区器分过区。
      */
      def mapPartitionsWithIndex[U: ClassTag](-------要求传入一个函数
          f: (Int, Iterator[T]) => Iterator[U],------函数要求传入两个参数
          preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(
          this,
          (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
          preservesPartitioning)
      }
      
      代码实例:
      1. 首先自定义一个函数,符合mapPartitionsWithIndex参数要求的函数
      scala> val func = (index : Int,iter : Iterator[Int]) => {
          | iter.toList.map(x=>"[PartID:" + index + ",val:" + x + "]").iterator
          | }
      func: (Int, Iterator[Int]) => Iterator[String] = <function2>
      2. 定义一个算子,分区数为2
      scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
      rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
      3. 调用方法,传入自定义的函数
      scala> rdd1.mapPartitionsWithIndex(func).collect
      res0: Array[String] = Array([PartID:0,val:1], [PartID:0,val:2], [PartID:0,val:3], [PartID:0,val:4], [PartID:1,val:5], [PartID:1,val:6], [PartID:1,val:7], [PartID:1,val:8], [PartID:1,val:9])
      
    2. aggregate:聚合操作,是Action
      • 源码
        /**
        * Aggregate the elements of each partition, and then the results for all the partitions, using
        * given combine functions and a neutral "zero value". This function can return a different result
        * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
        * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
        * allowed to modify and return their first argument instead of creating a new U to avoid memory
        * allocation.
        * 将RDD中元素聚集,须提供0初值(因为累积元素,所有要提供累积的初值)。先在分区内依照seqOp函数
        * 聚集元素(把T类型元素聚集为U类型的分区“结果”),再在分区间按照combOp函数聚集分区计算结果,最后返回这个结果
        *
        * @param zeroValue the initial value for the accumulated result of each partition for the
        *                  `seqOp` operator, and also the initial value for the combine results from
        *                  different partitions for the `combOp` operator - this will typically be the
        *                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
        * @param seqOp an operator used to accumulate results within a partition
        * @param combOp an associative operator used to combine results from different partitions
        * 第一个参数是初始值, 第二个参数:是两个函数[每个函数都是2个参数(第一个参数:先对个个分区进行合并, 
        * 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]
        */
        def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
          // Clone the zero value since we will also be serializing it as part of tasks
          var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
          val cleanSeqOp = sc.clean(seqOp)
          val cleanCombOp = sc.clean(combOp)
          val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
          val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
          sc.runJob(this, aggregatePartition, mergeResult)
          jobResult
        }
        
      • 代码实例:
        scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
        rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
        //这里先对连个分区分别进行相加,然后两个的分区相加后的结果再相加得出最后的结果
        scala> rdd1.aggregate(0)(_+_,_+_)
        res0: Int = 45                                                                 
        //先对每个分区比较求出最大值,然后每个分区求出的最大值再相加得出最后的结果
        scala> rdd1.aggregate(0)(math.max(_,_),_+_)
        res1: Int = 13
        //这里需要注意,初始值是每次都要参与运算的,例如下面的代码:分区1是1,2,3,4;初始值为5,则他们比较最大值就是5,分区2是5,6,7,8,9;初始值为5,则他们比较结果最大值就是9;然后再相加,这里初始值也要参与运算,5+(5+9)=19
        scala> rdd1.aggregate(5)(math.max(_,_),_+_)
        res0: Int = 19
        -----------------------------------------------------------------------------------------------
        scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
        rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
        //这里需要注意,由于每个分区计算是并行计算,所以计算出的结果有先后顺序,所以结果会出现两种情况:如下
        scala> rdd2.aggregate("")(_+_,_+_)
        res0: String = defabc                                                                                                                    
        scala> rdd2.aggregate("")(_+_,_+_)
        res2: String = abcdef
        //这里的例子更能说明上面提到的初始值参与计算的问题,我们可以看到初始值=号参与了三次计算
        scala> rdd2.aggregate("=")(_+_,_+_)
        res0: String = ==def=abc
        --------------------------------------------------------------------------------------
        scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
        rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
        scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
        res1: String = 42                                                               
        scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
        res3: String = 24
        -------------------------------------------------------------------------------------------
        scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
        rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
        //这里需要注意:第一个分区加上初始值元素为"","12","23",两两比较,最小的长度为1;第二个分区加上初始值元素为"","345","",两两比较,最小的长度为0
        scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
        res4: String = 10                                                               
        scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
        res9: String = 01                                                               
        ------------------------------------------------------------------------------------
        //注意与上面的例子的区别,这里定义的rdd里的元素的顺序跟上面不一样,导致结果不一样
        scala> val rdd5 = sc.parallelize(List("12","23","","345"),2)
        rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
        scala> rdd5.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
        res1: String = 11 
        
    3. aggregateByKey:按照key值进行聚合
      //定义RDD
      scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
      pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
      //自定义方法,用于传入mapPartitionsWithIndex
      scala> val func=(index:Int,iter:Iterator[(String, Int)])=>{
          | iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
          | }
      func: (Int, Iterator[(String, Int)]) => Iterator[String] = <function2>
      //查看分区情况
      scala> pairRDD.mapPartitionsWithIndex(func).collect
      res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
      //注意:初始值为0和其他值的区别
      scala> pairRDD.aggregateByKey(0)(_+_,_+_).collect
      res4: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))               
      
      scala> pairRDD.aggregateByKey(10)(_+_,_+_).collect
      res5: Array[(String, Int)] = Array((dog,22), (cat,39), (mouse,26))
      //下面三个的区别:,第一个比较好理解,由于初始值为0,所以每个分区输出不同动物中个数最多的那个,然后在累加
      scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
      res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
      
      //下面两个:由于有初始值,就需要考虑初始值参与计算,这里第一个分区的元素为("cat",2), ("cat", 5), ("mouse", 4),初始值是10,不同动物之间两两比较value的大小,都需要将初始值加入比较,所以第一个分区输出为("cat", 10), ("mouse", 10);第二个分区同第一个分区,输出结果为(dog,12), (cat,12), (mouse,10);所以最后累加的结果为(dog,12), (cat,22), (mouse,20),注意最后的对每个分区结果计算的时候,初始值不参与计算
      scala> pairRDD.aggregateByKey(10)(math.max(_,_),_+_).collect
      res7: Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20))
      //这个和上面的类似
      scala> pairRDD.aggregateByKey(100)(math.max(_,_),_+_).collect
      res8: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
      
    4. coalesce:返回一个新的RDD
      重新给RDD的元素分区。
      当适当缩小分区数时,如1000->100,spark会把之前的10个分区当作一个分区,并行度变为100,不会引起数据shuffle。
      当严重缩小分区数时,如1000->1,运算时的并行度会变成1。为了避免并行效率低下问题,可将shuffle设为true。shuffle之前的运算和之后的运算分为不同stage,它们的并行度分别为1000,1。
      当把分区数增大时,必会存在shuffle,shuffle须设为true。
      partitionBy:按照传入的参数进行分区,传入的参数为分区的实例对象,可以传入之定义分区的实例或者默认的HashPartitioner;源码如下:
      /**
      * Return a copy of the RDD partitioned using the specified partitioner.
      */
      def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
      if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
      }
      if (self.partitioner == Some(partitioner)) {
          self
      } else {
          new ShuffledRDD[K, V, V](self, partitioner)
      }
      }
      
      repartition:返回一个新的RDD
      按指定分区数重新分区RDD,存在shuffle。
      当指定的分区数比当前分区数目少时,考虑使用coalesce,这样能够避免shuffle。
      scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
      rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
      
      scala> val rdd2 = rdd1.repartition(6)
      rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:26
      
      scala> rdd2.partitions.length
      res0: Int = 6
      
      scala> val rdd3 = rdd2.coalesce(2,true)
      rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at coalesce at <console>:28
      
      scala> rdd3.partitions.length
      res1: Int = 2
      
    5. collectAsMap:将RDD转换成Map(注意RDD的数据应为对偶元组)
      scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2),("c", 2),("d", 4),("e", 1)))
      rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
      scala> rdd1.collectAsMap
      res3: scala.collection.Map[String,Int] = Map(e -> 1, b -> 2, d -> 4, a -> 1, c -> 2)
      
    6. combineByKey:和reduceByKey的效果相同,reduceByKey底层就是调用combineByKey
      • 源码
        /**
        * Generic function to combine the elements for each key using a custom set of aggregation
        * functions. This method is here for backward compatibility. It does not provide combiner
        * classtag information to the shuffle.
        *
        * @see [[combineByKeyWithClassTag]]
        */
        def combineByKey[C](
            createCombiner: V => C,
            mergeValue: (C, V) => C,
            mergeCombiners: (C, C) => C,
            partitioner: Partitioner,
            mapSideCombine: Boolean = true,
            serializer: Serializer = null): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
            partitioner, mapSideCombine, serializer)(null)
        }
        
        /**
        * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
        * This method is here for backward compatibility. It does not provide combiner
        * classtag information to the shuffle.
        *
        * @see [[combineByKeyWithClassTag]]
        */
        def combineByKey[C](
            createCombiner: V => C,
            mergeValue: (C, V) => C,
            mergeCombiners: (C, C) => C,
            numPartitions: Int): RDD[(K, C)] = self.withScope {
              combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
        }
        
      • 参数说明:
        1. 第一个参数createCombiner: V => C:生成合并器,每组key,取出第一个value的值,然后返回你想合并的类型。
        2. 第二个参数mergeValue: (C, V) => C:函数,局部计算
        3. 第三个参数mergeCombiners: (C, C) => C:函数,对局部计算的结果再进行计算
      • 代码实例
        //首先声明两个rdd,然后利用zip将两个rdd合并成一个,rdd6
        scala> val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
        rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24
        
        scala> val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
        rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
        
        scala> val rdd6 = rdd5.zip(rdd4)
        rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[23] at zip at <console>:28
        
        scala> rdd6.collect
        res6: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
        
        //我们需要将按照key进行分组合并,相同的key的value都放在List中
        //这里我们第一个参数List(_):表示将第一个value取出放进集合中
        //第二个参数(x:List[String],y:String)=>x :+ y:表示局部计算,将value加入到List中
        //第三个参数(m:List[String],n:List[String])=>m++n:表示对局部的计算结果再进行计算
        
        scala> val rdd7 = rdd6.combineByKey(List(_),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n)
        rdd7: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[24] at combineByKey at <console>:30
        
        scala> rdd7.collect
        res7: Array[(Int, List[String])] = Array((1,List(dog, cat, turkey)), (2,List(wolf, bear, bee, salmon, rabbit, gnu)))
        
        //这里第一个参数,可以有另外的写法。如下面的两个
        scala> val rdd7 = rdd6.combineByKey(_::List(),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
        rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))
        
        scala> val rdd7 = rdd6.combineByKey(_::Nil,(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
        rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))
        
    7. countByKey、countByValue:按照key或者value计算出现的次数
    scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:24
    scala> rdd1.countByKey
    res8: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)          
    scala> rdd1.countByValue
    res9: scala.collection.Map[(String, Int),Long] = Map((c,2) -> 1, (a,1) -> 1, (b,2) -> 2, (c,1) -> 1)
    
    1. filterByRange
    scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1),("b",6)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
    //注意:这里传入的参数,是左闭右闭的区间
    scala> val rdd2 = rdd1.filterByRange("b","d")
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at filterByRange at <console>:26
    scala> rdd2.collect
    res10: Array[(String, Int)] = Array((c,3), (d,4), (c,2), (b,6))
    
    1. flatMapValues:对values进行处理,类似flatMap,会将key和每一个分出来的value组成映射
    scala> val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
    rdd3: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at parallelize at <console>:24
    scala> val rdd4 = rdd3.flatMapValues(_.split(" "))
    rdd4: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[36] at flatMapValues at <console>:26
    scala> rdd4.collect
    res11: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
    

    mapValues:不改变key,只针对传入的键值对的value进行计算,类似于map;注意与上面的flatMapValues的区别,它不会改变传入的key-value对,只是将value按照传入的函数进行处理;

    scala> val rdd3 = sc.parallelize(List(("a",(1,2)),("b",(2,4))))
    rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ParallelCollectionRDD[57] at parallelize at <console>:24
    scala> rdd3.mapValues(x=>x._1 + x._2).collect
    res34: Array[(String, Int)] = Array((a,3), (b,6))
    ------------------------------------------------------------------------
    如果使用flatMapValues,结果如下,它将value全部拆开跟key组成映射
    scala> rdd3.flatMapValues(x=>x + "").collect
    res36: Array[(String, Char)] = Array((a,(), (a,1), (a,,), (a,2), (a,)), (b,(), (b,2), (b,,), (b,4), (b,)))
    
    1. foldByKey:根据key分组,对每一组的value进行计算
    scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24
    scala> val rdd2 = rdd1.map(x=>(x.length,x))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[38] at map at <console>:26
    scala> rdd2.collect
    res12: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))
    -----------------------------------------------------------------------------
    scala> val rdd3 = rdd2.foldByKey("")(_+_)
    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[39] at foldByKey at <console>:28
    scala> rdd3.collect
    res13: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))
    scala> val rdd3 = rdd2.foldByKey(" ")(_+_).collect
    rdd3: Array[(Int, String)] = Array((4," bear wolf"), (3," dog cat"))
    -----------------------------------------------------------------------------
    //进行wordcout的计算
    val rdd = sc.textFile("hdfs://node-1.itcast.cn:9000/wc").flatMap(_.split(" ")).map((_, 1))
    rdd.foldByKey(0)(_+_)
    
    1. keyBy:以传入的参数作为key,生成新的RDD
    scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[41] at parallelize at <console>:24
    scala> val rdd2 = rdd1.keyBy(_.length)
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[42] at keyBy at <console>:26
    scala> rdd2.collect
    res14: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
    
    1. keys、values:取出rdd的key或者value,生成新的RDD
    scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[43] at parallelize at <console>:24
    scala> rdd1.keys.collect
    res16: Array[String] = Array(e, c, d, c, a)
    scala> rdd1.values.collect
    res17: Array[Int] = Array(5, 3, 4, 2, 1)
    

    相关文章

      网友评论

          本文标题:[Spark]Spark RDD&算子总结

          本文链接:https://www.haomeiwen.com/subject/rzrluctx.html