美文网首页
spark之transform和action

spark之transform和action

作者: aaron1993 | 来源:发表于2017-08-06 12:53 被阅读0次

    1. tranformation

    1. map
      map实现如下:
     def map[U: ClassTag](f: T => U): RDD[U] = withScope {
       val cleanF = sc.clean(f)
       new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
     }
    

    map接收一个函数f为参数,该函数接收参数类型T,然后返回类型U。当前RDD数据类型T,map使用函数f将RDD中的每一条记录转换为类型为U的数据。 比如:

    // 创建一个新的RDD oddNums,包含两个partition,只有奇数组成。
    val oddNums = sc.parallelize(List(1,3,5,7,9),2)
    // 使用函数 x => x + 1将 oddNums中的奇数转换成偶数。
    val evenNums = oddNums.map(x => x + 1)
    

    从map的实现的可以看出,函数cleanF是通过iter.map(cleanF)发挥作用的,这就意味着iter中有多少个值,cleanF就会调用多少次,后面还会介绍mapPartitions,作用和map一样,但是实现有所区别,将会在mapPartitions中提到。

    1. flatMap
      flatMap的原型如下:
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }
    

    flatMap将每一个元素转换成一个集合类型,然后又将这些集合的元素拿出来展开拼在一起作为下一个RDD的数据。

    flatMap接收的参数f同样也是一个函数,这个函数接收T类型(当前RDD的数据类型),然后返回一个集合,集合的元素类型为U(TraversableOnce一般都是集合实现的特质)。

    flatMap调用new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)), iter迭代源RDD,迭代的元素类型T,it er.flatMap使用函数f将T转换成U的集合,然后返回U的集合上的迭代器。比如T1被转换成集合[U1,U2,U3], U上的迭代器迭代返回U1,U2,U3三个元素,而不是[U1,U2,U3]这个集合,也就是说集合被展开了。

    看一个例子:

    //RDD someNums包含数据1,2,3,4,5。要把它转换成1,1,2,2,3,3,4,4,5,5
    val someNums = sc.parallelize(List(1,2,3,4,5))
    val doubleSomeNums = someNums.flatMap(x => List(x,x))
    doubleSomeNums.collect
    // Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5),上面的1,2,3,4,5首先被转换成[1,1],[2,2],[3,3],[4,4],[5,5],然后在被连接成1,1,2,2,3,3,4,4
    
    1. filter
      原型如下:
    def filter(f: T => Boolean): RDD[T] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
    }
    

    filter接收断言f,对RDD中的数据,满足f的返回,不满足的丢弃。

    1. distinct
      原型如下:
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
       map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
     }
    

    distinct会将RDD中重复的数据只保留一份,这是一个全局去重操作,而不是仅仅对每个分区操作去重,全局去重就意味着需要将散落在各个分区里的元素聚合到一起。

    上面代码表明的它的实现原理:

    • 使用map将单个value映射成(value,null)这样的键值对;
    • reduceByKey将相同的聚集在一起,(x,y) => x是其聚集使用的函数 ,聚合函数是作用在key相同的value上的,由于所有value都是null,所以这其实是(null,null) => null的函数。
    • map(_._1)返回key,以数据[2,2]为例:
            map(x => (x, null))   reduceByKey   map(_._1)
                     |                 |             |
    输入2,2 -> (2,null),(2,null) ->  (2, null)   ->      2
    
    1. coalesce
      coalesce用来改变RDD的分区个数,重新分区。方法原型如下:
    def coalesce(numPartitions: Int, shuffle: Boolean = false,
                partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
               (implicit ord: Ordering[T] = null)
       : RDD[T] 
    

    参数shuffle=true且是在扩大分区数(即目标rdd分区数numPartitions大于当前分区)则会导致shuffle过程。

    1. union
      union用来将多个RDD做并集,合并后的数据不会进行去重。
      其方法原型:
      def union(other: RDD[T]): RDD[T] = withScope {
      sc.union(this, other)
      }
      def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
      /*获得所有参与union的rdd的分区方法partitioner,转换成set
       这就意味着如果所有的rdd使用相同的分区方法,比如都是HashPartitioner,
      而且并且各自的partitioner相等(即equals返回true,对于HashPartitioner来说,
      equals为 true的条件是分区的个数一样,RangePartitioner要复杂一点),那么返回的set即partitioners的size为1.
       */
       val partitioners = rdds.flatMap(_.partitioner).toSet
      if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
        new PartitionerAwareUnionRDD(this, rdds)
      } else {
        new UnionRDD(this, rdds)
      }
      }
    

    上面if分支中,如果参与union的rdd都定义了partitioner(rdds.forall(_.partitioner.isDefined)返回true,一般只有ShuffledRDD有partitioner)且它们的partitioner一样,这就表示参与union的rdd都产生相同个数的分区(假设个数为p),这就好办了,union生成新的RDD:PartitionerAwareUnionRDD,新的RDD的拥有p个分区,第i个分区就有上游参与union的rdd里的第i个分区组成。所以总结一下,这种情况所有父rdd都有p个分区,那生成的新的rdd也有p个分区。

    else分支中,创建UnionRDD。假设参与合并的rdd1,rdd2的分区分别是(R1P1,R1P2)和(R2P1,R2P2),一共4个分区,新的UnionRDD也将有四个分区,也就是(R1P1,R1P2,R2P1,R2P2)。

    1. sortBy、sortByKey
      对RDD中的数据进行全局排序,下面是sortBy的原型:
    def sortBy[K](
        f: (T) => K,
        ascending: Boolean = true,
        numPartitions: Int = this.partitions.length)
        (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
      this.keyBy[K](f)
          .sortByKey(ascending, numPartitions)
          .values
    }
    

    上面sortBy实际上上调用了OrderedRDDFunctions#sortByKey(注: OrderedRDDFunctions经常出现,这里使用到了scala的隐式转换RDD隐式转换成OrderedRDDFunctions)方法。

    sortByKey的是机遇reduce会对key进行排序这一原理实现的,利用每一个reducer会对自己分区内的key进行排序的原理,但是由于reducer只会保证自己分区内的数据按key排序,分区之间的有序则需要另外的机制来保证(参考hadoop terasort的排序原理)。

    这里简单说一下原理:假设有10个分区,那我门就从数据中采样9个数,这9个数就决定了10个区间,然后shuffle时,就将每一个上游rdd中的数据都落到10个里的其中一个,这样partition之间也就有序了。

    即然有shuffle这个过程,也就需要一个paritioner来决定数据流向下游那一个reducer,这里使用到的partitioner是RangePartitioner,而这里RangePartitoner的range的划分也就是上一段里那个简单原理介绍中所说。

    注:关于shuffle的过程有兴趣的话可以参考Spark shuffle 原理
    注:关于隐式转换可以参考scala 隐式转换

    1. intersection
      求两个rdd的交集,交集的结果会去重,方法原型如下:
    def intersection(other: RDD[T]): RDD[T] = withScope {
      this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
          .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
          .keys
    }
    

    使用到了cogroup(PairRDDFunction提供的方法,PairRDDFunction中的方法只能作用于数据(key,value)形式的RDD,这里同样使用了RDD到PairRDDFunction的隐式转换)。两个rdd,分别是r1、r2,做cogroup操作,依然是按照两个rdd中相同的key做group,cogroup生成一个CoGroupedRDD类型的RDD,生成新的RDD的数据中key即源r1,r2相同的key,value是一个tuple,tuple的第一个元素是r1中key对应所有value上的iterator,第二个元素是r2中该key的所有value的iterator。

    回到intersection方法,由于cogroup只能作用于数据(key,value)这种二元组形式的RDD,所以先将RDD的value map成(value, null); 接着做cogroup,做完cogroup之后,对于相交的数据,必然二元组中两个部分都不空(也就满足filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty条件)。

    下面是一个例子展示数据变换过程

    //()表示tuple,[]表示列表, E表示空
    输入: rdd1: 1, 2 ,3;rdd2: 2,3,4
    map: rdd1 -> rdd3: (1,null),(2,null),(3,null)
    map: rdd2 -> rdd4: (2,null),(3,null),(4,null)
    cogroup:rdd3,rdd4 ->  rdd5: (1, ([null], E)), (2, ([null], [null])), (3, ([null], [null])), (4,(E, [null]))
    //value中有E表示这个key只存在于一个rdd中,去掉
    filter: (2, ([null], [null])), (3, ([null], [null]))
    keys: 2,3
    

    其他的3个或者更多个rdd参与cogroup原理是一样的。
    由于cogroup是一个比较复杂的过程,可以参考附录cogroup。

    1. glom
      方法如下:
    def glom(): RDD[Array[T]] = withScope {
        new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
      }
    

    glom将分区里的所有数据合成到一个数组。
    比如:

    // rdd r1 包含1 to 5, 分成两个分区.分区1包含1,2;分区2包含3,4,5
    scala> val r1 = sc.parallelize(1 to 5,2)
    scala> r1.collect
    res20: Array[Int] = Array(1, 2, 3, 4, 5)
    
    // glom并调用collect查看结果. 依然包含两个分区,但是分区的元素被合成数组,也就是说原来分区1包含两个数据记录,现在只有一个类型为Array的数据记录了。
    scala> r1.glom.collect
    res21: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5))
    

    从glom的实现来看,使用了iter.toArray将源rdd的一个分区里的数据放到一个数据里,是一个很消耗内存的方法,分区数据很多时还是要注意使用。

    1. cartesian
      对两个rdd做笛卡尔积,方法原型如下:
    def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
      new CartesianRDD(sc, this, other)
    }
    

    假设参与笛卡尔积的两个rdd分别是r1,r2拥有分区[r1p1,r1p2]和[r2p1,r2p2],r1.cartesian(r2)生成类型为CartesianRDD的新rdd,假设是r3,r3拥有分区就是r1和r2分区的笛卡尔积, 即:[(r1p1, r2p1), (r1p1, r2p2), (r1p2, r2p1), (r1p2, r2p2)], 那么在r3上任意一个分区上计算时,假设是(r1p1, r2p1)上,只需要迭代r1p1, r2p1里的数据然后做笛卡尔积就行了。

    下面是CartesianRDD的getPartitions方法:

    override def getPartitions: Array[Partition] = {
        // array保存分区,个数就是rdd1和rdd2分区个数相乘
        val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
        //CartesianRDD拥有的分区也是rdd1和rdd2分区的笛卡尔积
        for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
          val idx = s1.index * numPartitionsInRdd2 + s2.index
          array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
        }
        array
      }
    

    下面是CartesianRDD的compute方法:

    override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
        val currSplit = split.asInstanceOf[CartesianPartition]
       // CartesianRDD每一个分区都是上游rdd1和rdd2各一个分区组成,也就是下面的s1,s2. 此处两重循环的形式完成元素的笛卡尔积计算
        for (x <- rdd1.iterator(currSplit.s1, context);
             y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
      }
    
    1. groupBy、 PairRDDFunction#groupByKey
      方法如下:
    //由于源rdd可以的数据t不是(key,value)这种二元组,因此
    它需要一个f能够把源rdd里的数据类型T转换成key的类型K。最终生成的目标rdd的数据形式是(f(t), t)这种。
    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
        //group按key聚合涉及到shuffle,使用defaultPartitioner获得默认的partitioner是HashPartitoner
        groupBy[K](f, defaultPartitioner(this))
      }
    

    groupBy把相同的key对应的value组合在一起,可以放到一个列表中,此外它不保证value的顺序,也不保证每次调用value都按相同方式排列。 下面是一个groupBy的例子:

    val r1 = sc.parallelize(List(1,2,3,4,3,2),2)
    r1.groupBy(x=>x).collect
    //groupBy的结果,key相同的value都被放到CompactBuffer里,value仅仅是被简单的拼接。因此这是一种十分耗时且消耗存储的操作。
    // grouyBy和reduceBy底层都使用PairRDDFunctions#combineByKeyWithClassTag,只不过使用的用来聚合value的aggregator不同,groupBy的aggregator就是将value加到CompactBuffer里。
    res39: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (2,CompactBuffer(2, 2)), (1,CompactBuffer(1)), (3,CompactBuffer(3, 3)))
    

    这里一路跟到PairRDDFunctions#groupByKey的实现看看:

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
       //CompactBuffer可以暂时理解做高效的ArrayBuffer
        val createCombiner = (v: V) => CompactBuffer(v)
      // mergeValue函数把key相同的value聚合到一起,这里的实现是直接到v添加到数组末尾
        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
        val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
        bufs.asInstanceOf[RDD[(K, Iterable[V])]]
      }
    

    从上面聚合的方式来看,就是把value都放到数组里,这在数据很多时,是一种很好内存的操作,有可能会OOM,所以要注意,能用reduceBy的就不要用groupBy。

    1. mapPartitions
      mapPartitions功能和map类似,但还是实现上还是有区别的,下面是mapPartitions的原型:
      def mapPartitions[U: ClassTag](
         f: Iterator[T] => Iterator[U],
         preservesPartitioning: Boolean = false): RDD[U] = withScope {
       val cleanedF = sc.clean(f)
       new MapPartitionsRDD(
         this,
         (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
         preservesPartitioning)
     }
    
     // 作为比较还有map的原型:
    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
       val cleanF = sc.clean(f)
       new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
     }
    

    比较一下map和mapPartitions的不同,用户自定的函数f,在map中是通过iter.map(cleanF)调用的,这意味着每一次iter上的value迭代都会调用一次f; 而 mapPartitions中f是通过cleanedF(iter)调用的,直接作用在iter上,然后返回一个新的iter,f实际上只被调用了一次。当有些资源需要在f中创建时(比如jdbc连接),使用map会导致频繁创建,可以考虑使用mapPartitions.

    1. zip
      作用和集合上的zip一样,集合上zip会将两个集合相同index上的value合成tuple,这就要求两个集合大小一样。rdd上的zip要求两个rdd拥有相同个数的partition,每个partition又拥有相同个数的数据。

    如下例子:

    // RDD r1包含两个分区
    scala> val r1 = sc.parallelize(1 to 10,2)
    scala> r1.collect
    res53: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    //RDD r2同样两个分区,且分区里数据个数和r1一样。
    scala> val r2 = sc.parallelize(11 to 20,2)
    scala> r2.collect
    res54: Array[Int] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    scala> r1.zip(r2).collect
    // r1和r2相同下标的数据组合成一个元组(tuple)
    res56: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20))
    
    1. subtract
      求两个rdd的差,调用rdd1.substract(rdd2)会返回rdd1中去掉和rdd2相同数据的剩下部分, 但是不会对剩下的部分的数据去重。subtract都会最终调用下面的subtract方法:
    def subtract(
       other: RDD[T],
       p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
     //判断源rdd(即rdd1)的partitioner是否为空,不空的话往往意味着源rdd到目地RDD会产生shuffle操作生成的。
     if (partitioner == Some(p)) {
    
      //源rdd的partitoner不空,那源RDD的数据类型T一定是(key,value)形式的,这里之所以包装成新的partitioner,跟下面的map调用有关。下面的map会把源rdd中(key,value)数据作为新生成的rdd中的key,这里新的p2需要从新生成的rdd的key中(此时key类型(key,value))提取出源rdd的key。
       val p2 = new Partitioner() {
         override def numPartitions: Int = p.numPartitions
         override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
         }
        // 再回顾上面的p2对partitioner的包装,源rdd有partitioner,则源rdd的类型是范型T实际是(key,value),此处map又把它转换成((key,value), null),所以需要包装成p2去key从(key,value)里取出来
         this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
         } else {
         this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
         }
       }
    

    下图是subtract产生的rdd依赖:

    subtract产生的rdd依赖

    subtractByKey生成新的rdd为SubtractedRDD,下面是它的getDependencies方法:

     override def getDependencies: Seq[Dependency[_]] = {
        def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
          : Dependency[_] = {
          /*由于这个方法传入的参数是上图的rdd3和rdd4都是map产生的,因此rdd.partitioner是空的,所以会走向else分支,
           else分支产生了ShuffleDependency,所以无论如何都会产生shuffle。 强迫他产生一次上图中的shuffle也是可以理解的,
           因为shuffle会使得上游rdd3,rdd4中key相同的进入到下游SubtractedRDD的同一分区上,那样做subtract就简单多了。
           */
          if (rdd.partitioner == Some(part)) {
            logDebug("Adding one-to-one dependency with " + rdd)
            new OneToOneDependency(rdd)
          } else {
            logDebug("Adding shuffle dependency with " + rdd)
            new ShuffleDependency[T1, T2, Any](rdd, part)
          }
        }
       // 这里的rdd1, rdd2对应SubtractedRDD上游依赖也就是上图的rdd3和rdd4
        Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
      }
    

    下面是完成subtract的计算在SubtractedRDD#compute方法:

    override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
        val partition = p.asInstanceOf[CoGroupPartition]
       // map是key到由key相同的value组成的Array的映射,这里所有的value都是null。
        val map = new JHashMap[K, ArrayBuffer[V]]
        // 对于key,map中有就返回对应的ArrayBuffer,没有就新建立一个
        def getSeq(k: K): ArrayBuffer[V] = {
          val seq = map.get(k)
          if (seq != null) {
            seq
          } else {
            val seq = new ArrayBuffer[V]()
            map.put(k, seq)
            seq
          }
        } 
    
       // 由于只有ShuffleDependency,所以只会走到shuffleDepency的case上。
       // 这个函数根据depNum取到上游依赖的rdd(rdd3或则rdd4,然后对每一个值作为op的参数调用)
        def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
          dependencies(depNum) match {
            case oneToOneDependency: OneToOneDependency[_] =>
              val dependencyPartition = partition.narrowDeps(depNum).get.split
              oneToOneDependency.rdd.iterator(dependencyPartition, context)
                .asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
    
            case shuffleDependency: ShuffleDependency[_, _, _] =>
              //shuffleManager.getReader返回的迭代器迭代的一定是按key排好序的
              val iter = SparkEnv.get.shuffleManager
                .getReader(
                  shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
                .read()
              iter.foreach(op)
          }
        }
    
        // depNum = 0,先跌打rdd3 shuffle之后的数据,按照key在map中拿到ArrayBuffer,再把value都放到ArrayBuffer中。
        integrate(0, t => getSeq(t._1) += t._2)
        // 即然是做subtract,在迭代rdd4中的数据,对于每一个key,从map中去掉就行了。
        integrate(1, t => map.remove(t._1))
        map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
      }
    

    其原理是迭代rdd1中的key,放到map中,然后迭代rdd2中的key,在将key从之前的map中删除,得到的就是求差的结果。

    1. zipWithIndex
      对于rdd中的每一个数据,返回的数据以及数据在rdd中的索引组成的tuple, 如下例:
     val r1 = sc.parallelize(List('a','b','c','d'),2)
     // 返回tuple包括a,b,c,d在rdd中索引,而且是全局索引。
     scala> r1.zipWithIndex.collect
         res70: Array[(Char, Long)] = Array((a,0), (b,1), (c,2), (d,3))
    

    方法实现如下:

     def zipWithIndex(): RDD[(T, Long)] = withScope {
       new ZippedWithIndexRDD(this)
     }
    

    zipWithIndex的基本原理:由于需要知道每一个partition里面的每一个元素的全局索引,首先需要计算出每一个partition的元素的个数,这样就能计算出第i个partition的第一个元素在所有全部数据里面的偏移值,接下来就简单了,由于任务是基于parition上的数据迭代的,那么parition里的数据的全局偏移就是该partition的第一个元素的偏移加上当前迭代到的元素在parition里的偏移值。

    下面是ZippedWithIndexRDD中定义的一些方法:

    class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
    
      /** The start index of each partition. */
      @transient private val startIndices: Array[Long] = {
       //获得依赖的父rdd的partition的个数
        val n = prev.partitions.length
        if (n == 0) {
          Array.empty
        } else if (n == 1) {
          Array(0L)
        } else {
         /*这里提交了一个spark job运行来统计每一个partition的元素个数。
            1. 参数Utils.getIteratorSize是一个函数,task运行在分区上时调用,它返回分区大元素个数.
            2. 参数0 until n-1指定了运行task的分区是[0, n-1),不需要计算最后一个分区大小,
               因为最后一个分区的偏移是前面所有分区的元素个数之和。
            3. scanLeft(0L)(_ + _),runJob返回[0,n-1)的partition大小的列表,scanLeft计算出偏移。
         */
          prev.context.runJob(
            prev,
            Utils.getIteratorSize _,
            0 until n - 1 
          ).scanLeft(0L)(_ + _)
        }
      }
    
      override def getPartitions: Array[Partition] = {
        //根据上有partition包装新的分区ZippedWithIndexRDDPartition,新的分区携带了自己的偏移。这是一个窄依赖
        firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
      }
    
      override def getPreferredLocations(split: Partition): Seq[String] =
        firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)
    
      override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
        val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
        val parentIter = firstParent[T].iterator(split.prev, context)
        // 重要的是这个方法,迭代上游分区的数据,返回(data, data_index)
        Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
      }
    }
     ---------------------------
    贴一下Utils.getIteratorZipWithIndex的实现:
    1. 参数iterator是上游partition的迭代器
    2. startIndex是上游partition的第一个元素的全局偏移
     def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
        new Iterator[(T, Long)] {
          require(startIndex >= 0, "startIndex should be >= 0.")
     
          var index: Long = startIndex - 1L
          def hasNext: Boolean = iterator.hasNext
          // next返回数据和其index的元组。
          def next(): (T, Long) = {
            index += 1L
            (iterator.next(), index)
          }
        }
      }
    

    2. action

    这小节里列出action,和transform不同,action会触发job的提交运行。

    1. reduce
      原型如下:
    def reduce(f: (T, T) => T): T = withScope {
      val cleanF = sc.clean(f)
      /* 这个函数作为runJob的第二个参数,作用于一个job里的最后一个阶段(ResultStage)每一个分区。
          这个函数干了什么: 接受一个上游parition上的迭代器,然后调用迭代器的reduceLeft, reduceLeft使用函数f来对数据做reduce。
          所以这个函数完成了ResultStage的每一个分区的reduce,不是全局的reduce
    */
      val reducePartition: Iterator[T] => Option[T] = iter => {
        if (iter.hasNext) {
          Some(iter.reduceLeft(cleanF))
        } else {
          None
        }
      }
      var jobResult: Option[T] = None
       /* 这个函数作为sc.runJob的第三个参数,当reducePartition完成每一个分区的reduce之后,
          用来对每一个分区的reduce结果合并,index是分区索引,taskResult即分区计算结果。
          它干了什么:同样适用函数f来对结果做规约,完成全局的reduce。
    */
      val mergeResult = (index: Int, taskResult: Option[T]) => {
        if (taskResult.isDefined) {
          jobResult = jobResult match {
            case Some(value) => Some(f(value, taskResult.get))
            case None => taskResult
          }
        }
      }
      /* 提交job,runJob需要两个参数,reducePartition作用于每个分区之上,也就是在executor上运行;
         mergeResult运行于driver端,收集每一个分区的结果到driver端,然后对这些结果运行mergeResult,如果每一个分区产生的结果很大的话,显然reduce可能会在driver端出现OOM
      */
      sc.runJob(this, reducePartition, mergeResult)
     
      jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
    }
    

    这个和reduceByKey不一样,reduceByKey是一个transform操作,会产生一个新的RDD(与上游RDD形成ShuffleDependency),这里的reduce是一个action,会触发job的提交(上面代码中sc.runJob);此外reduceByKey要求输入数据必须是(key,value)的二元tuple,而此处的reduce则不需要。

    1. aggregate
      aggregate也是对值做聚合操作的,但是和reduce还是不同的,下面是其方法原型:
    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
        // Clone the zero value since we will also be serializing it as part of tasks
        var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
        val cleanSeqOp = sc.clean(seqOp)
        val cleanCombOp = sc.clean(combOp)
        //作用于每个分区中的数据,对每个分区中的数据聚合。运行于executor上
        val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
        //运行在driver端,对收集回来的每个分区的聚合结果再一次聚合。
        val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
        // 和reduce一样,先用aggregatePartition在每一个分区上运行聚合分区的数据,然后获取所有分区的数据,使用mergeResult在Driver端聚合,同样从在Driver端OOM的可能。
        sc.runJob(this, aggregatePartition, mergeResult)
        jobResult
      }
    

    reduce和aggregate聚合值的区别从方法签名就可以看出,reduce聚合前后的值的类型是一样的,比如说你不能用reduce把一个int值拼成string返回。aggregate则可以把一种类型(T)的值聚合成另一种类型(U)返回。

    上面aggregate方法,U是聚合后类型,T是聚合前类型; 参数zeroValue提供一个初始值,seqOp定义怎么把T聚合到U上,combOp定义怎么把多个分区聚合后的值拼起来。

    下面是一个例子,把Int拼接成字符串

    // RDD r1 包含1 to 10的整型
    scala> r1.collect
    res63: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    // seq, 拼接字符串和整型
    scala> def seq(s:String, i:Int):String = { s + i}
    // comb,拼接分区聚合后的字符串
    scala> def comb(s1:String, s2:String):String = { s1 + s2}
    // 初始值zeroValue为空字符串
    scala> r1.aggregate("")(seq,comb)
    res64: String = 12345678910
    
    1. treeAggregate
      treeAggregate和aggregate功能上是一样的,但是实现细节不一样,下面treeAggregate的实现:
      def treeAggregate[U: ClassTag](zeroValue: U)(
        seqOp: (U, T) => U,
        combOp: (U, U) => U,
        depth: Int = 2): U = withScope {
      require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
      if (partitions.length == 0) {
        Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
      } else {
        val cleanSeqOp = context.clean(seqOp)
        val cleanCombOp = context.clean(combOp)
        val aggregatePartition =
          (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
        var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
        var numPartitions = partiallyAggregated.partitions.length
        val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
        // If creating an extra level doesn't help reduce
        // the wall-clock time, we stop tree aggregation.
    
        // Don't trigger TreeAggregation when it doesn't save wall-clock time
        while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
          numPartitions /= scale
          val curNumPartitions = numPartitions
          partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
            (i, iter) => iter.map((i % curNumPartitions, _))
          }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
        }
        partiallyAggregated.reduce(cleanCombOp)
      }
    }
    

    前文说到aggregate的过程是:先使用seqOp在各个分区上聚合,然后将分区结果全部拿到Driver端,然后使用combOp聚合,过多的分区的数据被移到driver端可能会导致driver上OOM;treeAggregate不同之处在于,分区聚合之后不马上把结果传回driver端聚合,而是调用reduceByKey再在远端按key聚合到更小的分区,如有必要还会经过多轮的reduceByKey,不停的把值聚合到更小的分区上,最终传回driver端做最终聚合。下图可以反应出aggregate和treeAggregate的过程上的区别:

    aggregate和treeAggregate
    1. fold
      和scala集合上的fold功能一样,实现原理和reduce一样,现在每一个分区上fold,然后结果传回driver在merge.

    2. take
      方法原型如下:

       def take(num: Int): Array[T]
      

      take接收一个整型参数num,返回rdd中前num个数(从第1个partition的第1个数开始的num个数).
      take的思路大概是这样的:

      1. 使用一个ArrayBuffer buf保存返回结果,buf.size就表示已经取到的结果,一开始时显然为0.
      2. 开始时将运行task的分区设成一个(也就是第一个partition0),因为不知道前num个元素会横跨多少parition,先尝试1个
      3. 运行job在分区上取前num - buf.size(也就是还需要取的个数),放到buf中。
      4. 判断buf.size有没有达到num,没有进入4. 达到就可以返回了。
      5. 按照某种比例扩大下一轮运行任务的分区个数,下一次job运行的的分区的索引为成区间[上一次任务运行最大分区索引 +1 , 上一次任务运行最大分区索引 +下一轮分区个数], 回到2继续运行。 (比如在partition0上数据不够num个,只有num1个,那么假设下一次扩大到在两个分区上运行,那么下一轮就在[partition-1,partition-2] 上取num - num1个数据)。
    3. top, takeOrdered
      这是两个方法,放在一起是因为top是基于调用takeOrdered实现的,它们的方法原型如下:

      // top返回最大的前num个数,元素排序由ord定义,ord比较x,y, 返回负数表示x<y, 0表示x==y。
       def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
        //takeOrdered默认是从小到大返回的,所以此处使用ord.reverse颠倒排序
         takeOrdered(num)(ord.reverse)
       }
       //takeOrdered返回最小的的num个数,排序由ord定义
      def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
       if (num == 0) {
         Array.empty
       } else {
        /* mapPartitions表示在每一个分区上运行,queue相当于一个大小为num的大根堆,维持当前已经迭代(items迭代器)的最小的num个值.
           mapParititons生成的新rdd mapRDDs的每一个parititon拥有之前上游rdd
           每个parititon的最小的num个元素
       */
         val mapRDDs = mapPartitions { items =>
           // Priority keeps the largest elements, so let's reverse the ordering.
           val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
           queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
           Iterator.single(queue)
         }
         if (mapRDDs.partitions.length == 0) {
           Array.empty
         } else {
          // 可以回顾下reduce action,获取到每一个分区返回的最小的num个元素值。然后合并这些值就能得到rdd中最小的num个元素。
           mapRDDs.reduce { (queue1, queue2) =>
             queue1 ++= queue2
             queue1
           }.toArray.sorted(ord)
         }
       }
     }
    

    上面takeOrdered使用了RDD#reduce这个方法将每个分区的最小的num个数传会driver,在driver比较获得全局最小的num个数,如果num值很大的话会造成driver OOM

    1. max,min
      获取rdd中最大和最小值
    def max()(implicit ord: Ordering[T]): T = withScope {
      // 回顾reduce,接收(T,T) => T的函数,ord.max方法比较两个值,返回大的。
     // reduce现在每个partition上运行ord.max取得partition最大的值,然后将这些值返回给driver端,得到最大的值。
      this.reduce(ord.max)
    }
    
    def min()(implicit ord: Ordering[T]): T = withScope {
      this.reduce(ord.min)
    }
    

    3 附录

    3.1 cogroup

    3.1.1 cogroup的作用

    首先cogroup是PairRDDFunctions中定义的方法,它只能作用于元素类型是(key,value)二元组型这样的rdd, cogroup可以接收多个rdd作为参数进行操作,但是为了方便,这里只假设有两个rdd: r1, r2.
    r1,r2 cogroup产生新的rdd r3: r3的key包含了r1,r2的所有的key,对于key的value是一个数组,数组组的元素依次是key在r1和r2中所有的value的数组。
    下面是一个例子:

    //r1, r2是国家到城市的二元组
    val r1 = sc.parallelize(List(("china","hefei"),("USA","chicago"),("japan","tokyo")))
    val r2 = sc.parallelize(List(("china","beijing"),("USA","new york"),("china","shanghai")))
    r1.cogroup(r2).collect
    //输出,CompactBuffer可以理解成数组,可以看到key包含了r1,r2的所有的key,
    Array((japan,(CompactBuffer(tokyo),CompactBuffer())), (USA,(CompactBuffer(chicago),CompactBuffer(new york))), (china,(CompactBuffer(hefei),CompactBuffer(beijing, shanghai))))
    

    3.1.2 cogroup原理

    cogroup会产生CoGroupedRDD,直接看他的实现吧:

    //rdd即参与cogroup的所有rdd,是一个数组,所以可以有多个rdd。
    //类型化参数'_ <: Product2[K, _]'表明rdd的元素必须是二元组,而且所有的rdd的key类型得是一样的.
    //part默认是HashPartitioner
    class CoGroupedRDD[K: ClassTag](
        @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
        part: Partitioner)
      extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
    
      /**
       1. 这里定义了新类型,CoGroup即来源于某一个上游rdd的key的value组成的数组.
       2. CoGroupValue,二元组,第一个元素Any类型是上游rdd中value,注意上游rdd的类型是(key,value),这里是提取value出来的,
          第二个元素Int是上游rdd在在dependencies列表中的index,也就是第一个元素来源于的那个rdd。
       3. CoGroupCombiner, 数组,每个元素是一个CoGroup,也就是说第i元素
          就是key在第I个rdd中所有value组成的数组。
       */
      private type CoGroup = CompactBuffer[Any]
      private type CoGroupValue = (Any, Int)  // Int is dependency number
      private type CoGroupCombiner = Array[CoGroup]
    
      private var serializer: Serializer = SparkEnv.get.serializer
    
      /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
      def setSerializer(serializer: Serializer): CoGroupedRDD[K] = {
        this.serializer = serializer
        this
      }
    
      override def getDependencies: Seq[Dependency[_]] = {
        //获取依赖时,遍历上游所有rdd
        rdds.map { rdd: RDD[_] =>
          //上游用的partitioner和当前CoGroupedRDD一样,默认的HashPartitioner相同的判断标准时产生一样的分区个数,RangeParitioner复杂一点。
          // 不管如果,相同就意味着上游rdd是通过shuffle产生的,所有的元素已经按照key聚合到对应的partiton了,
          // 当前RDD和上游rdd的分区直接可以一对一依赖,不同再shuffle一次聚合key了。
          if (rdd.partitioner == Some(part)) {
            logDebug("Adding one-to-one dependency with " + rdd)
            new OneToOneDependency(rdd)
          } else {
           // 否则的话只好shuffle一次,按key聚合好
            logDebug("Adding shuffle dependency with " + rdd)
            new ShuffleDependency[K, Any, CoGroupCombiner](
              rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
          }
        }
      }
    
      override def getPartitions: Array[Partition] = {
        val array = new Array[Partition](part.numPartitions)
        for (i <- 0 until array.length) {
          // Each CoGroupPartition will have a dependency per contributing RDD
          array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
            // Assume each RDD contributed a single dependency, and get it
            dependencies(j) match {
              case s: ShuffleDependency[_, _, _] =>
                None
              case _ =>
                Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
            }
          }.toArray)
        }
        array
      }
    
      override val partitioner: Some[Partitioner] = Some(part)
    
      override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
        val split = s.asInstanceOf[CoGroupPartition]
        val numRdds = dependencies.length
    
        /* 看看做了什么: 首先返回的是迭代器数组,包含对每一个上游rdd的迭代.
            其次迭代的元素类型是一个二元组,第一个元素类型‘Product2[K, Any]’表明它是上游rdd里的数据, 第二个元素Int则表明第一个元素所属的rdd
        */
        val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
        for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
         // 跟上有一对一依赖就简单很多了,直接取到依赖的上游parition,返回数据和上有rdd的索引就行了。
          case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
            val dependencyPartition = split.narrowDeps(depNum).get.split
            val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
            rddIterators += ((it, depNum))
    
          case shuffleDependency: ShuffleDependency[_, _, _] =>
           // 跟上游shuffle依赖,那么就需要有shuffle read的过程,不提细节,总之shuffle read完成之后,
           //会从上游所有rdd中收集了属于当前CoGroupedRDD的当前分区的所有元素, 
            val it = SparkEnv.get.shuffleManager
              .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
              .read()
            rddIterators += ((it, depNum))
        }
        
       /* 返回了一个类型是‘ ExternalAppendOnlyMap’的东西,它是干什么的呢,简单说, 这个map有按key聚合的作用,就像reduceBy一样。
          当你往里面插一个元素时,它会按照你定义的combine和merger函数,把相同的key的元素聚合起来
       */
        val map = createExternalMap(numRdds)
        //迭代上游数据,根据前面rddIterator的定义,此处it是上游rdd中的数据,类型应该是(key,value)的,depNum是rdd索引 
        for ((it, depNum) <- rddIterators) {
        // map要求插入的元素必须是(K,V)型的,这里的pair._1就是rdd中的key,value是CoGroupValue介绍过,所以map会按照key来聚合。
       //所以关键是map的combiner和merger的实现
          map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
        }
        context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
        context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
        //map聚合好之后,返回新的迭代器,返回InterruptibleIterator表示它可以被中途取消
        new InterruptibleIterator(context,
          map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
      }
    
      private def createExternalMap(numRdds: Int)
        : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
        // createCombiner用来在出现第一个元素时,将该元素转换成聚合后的元素,可能是列表之类的,什么都可以
        val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
          
          val newCombiner = Array.fill(numRdds)(new CoGroup)
         //value._2是rdd索引,value._1是rdd数据(key,value)中的value,这句表示value加到数组中。
          newCombiner(value._2) += value._1
          newCombiner
        }
        //将元素合并到聚合后的新类型元素上,还是往数组里加
        val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
          (combiner, value) => {
          combiner(value._2) += value._1
          combiner
        }
       // 将两个聚合后的新类型合并,合并两个数组
        val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
          (combiner1, combiner2) => {
            var depNum = 0
            while (depNum < numRdds) {
              combiner1(depNum) ++= combiner2(depNum)
              depNum += 1
            }
            combiner1
          }
        new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
          createCombiner, mergeValue, mergeCombiners)
      }
    
      override def clearDependencies() {
        super.clearDependencies()
        rdds = null
      }
    }
    

    1. 关于shuffle read可以参考shuffle read第三节
    2. 关于ExternalAppendOnlyMap可以参考ExternalAppendOnlyMap4.3节

    相关文章

      网友评论

          本文标题:spark之transform和action

          本文链接:https://www.haomeiwen.com/subject/zqoeqxtx.html