Spark 转换算子源码

作者: Tim在路上 | 来源:发表于2022-02-08 14:03 被阅读0次

    Spark 转换算子源码

    MapPartitionsRDD

    • map 算子

    map算子是对RDD中的每一个函数应用传入的函数。

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }
    

    可以从源码看出其入参是f: T ⇒ U 是一个函数,首先经过sc.clean(f) 进行闭包检测,然后创建一个MapPartitionsRDD。sc.clean()函数的作用检测用户构建的函数是否可以序列化,这是因为Spark中的map是一个分布式的函数,最终的执行是在Executor上开线程执行,而我们的函数构建都是在Driver端进行。Spark实际上进行的是计算的转移,将函数传递到数据所在的Worker节点。

    eg:
    val rdd = spark.sparkContext.parallelize(1 to 10)
    // 在driver端定义执行的函数
    val pow = (i: Int) => i * i
    val rdd1 = rdd.map(pow)
    
    private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
        var prev: RDD[T],
        f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
        preservesPartitioning: Boolean = false,
        isFromBarrier: Boolean = false,
        isOrderSensitive: Boolean = false)
      extends RDD[U](prev) { ...}
    

    从MapPartitionsRDD中的参数可以看出,其有一个函数的接受参数 f: (TaskContext, Int, Iterator[T]) => Iterator[U], 我们看map传入参数为,new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) 可以看出spark的map出入的迭代器为scala的map, 也就分布式的map,实际上分发到Worker节点后,执行的任然是scala的map函数。其次,TaskContext,和分区id都未传入使用。

    map算子属于转换算子,是懒执行的,所以说不会立即执行,那么它会在什么地方调用呢。

    override def compute(split: Partition, context: TaskContext): Iterator[U] =
      f(context, split.index, firstParent[T].iterator(split, context))
    

    RDD都有一个compute函数会被调用执行,可以看到传入的函数在compute函数中调用,从上可以看出,TaskContext传入的是节点的上下文,Int是分区id, Iterator[T]传入的是父节点调用迭代器。

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
      // 有设置存储等级
      if (storageLevel!= StorageLevel.NONE) {
        getOrCompute(split, context)
      } else {
        computeOrReadCheckpoint(split, context)
      }
    }
    

    可以看出如果设置存储等级(内存,磁盘对RDD的缓存),则从缓存Block中读取。否则有checkpoint,则执行父RDD的迭代器,否则执行compute函数。

    其次,MapPartitionsRDD的分区数和分区器是怎样的?

    override valpartitioner= if (preservesPartitioning) firstParent[T].partitionerelse None
    
    override def getPartitions: Array[Partition] = firstParent[T].partitions
    

    从上可以看起MapPartitionsRDD是窄依赖,其分区为父RDD依赖的分区数。其分区器在默认preservesPartitioning = false 是不保留父RDD的分区器,设置为None。

    • mapPartitions算子
    def mapPartitions[U: ClassTag](
        f: Iterator[T] => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U] = withScope {
      val cleanedF = sc.clean(f)
      new MapPartitionsRDD(
        this,
        (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
        preservesPartitioning)
    }
    

    mapPartitions的实现和map实质是一样的,只是在调用函数时,直接调用函数,返回一个迭代器。所以说mapPartitions其实并没有定义到分区执行的函数,需要用户自己定义,这里可以定义map,filter,flatmap等。

    那么mapPartitions的作用是什么呢?由于没有定义在分区上的执行函数,也就是只会在分区上执行一次,所以对于资源连接等问题,可以定义在这里,使得一个分区仅仅连接一次,而非每一个元素连接一次。

    其次这里也可以利用之前提到的context: TaskContext, index: Int,拿到分区的更多信息。

    val rdd = spark.sparkContext.parallelize(1 to 10)
    val rdd2 = rdd.mapPartitions(it => {
      val specID = TaskContext.getPartitionId() // 这个语句在每一个分区只执行一次
      it.filter(x => x % 2 == 0).map(x => (specID, x))
    })
    

    mapPartitionsWithIndex 函数实际上是上述例子的使用,传入分区id, 可以在数据中直接使用。

    val rdd2 = rdd.mapPartitionsWithIndex((i, it) => {
       it.filter(x => x % 2 == 0).map(x => i + "" + x)
    })
    
    • filter算子
    • 将函数应用于RDD的所有元素,返回的是满足条件的结果。
    val rdd = spark.sparkContext.parallelize(1 to 10)
    val f = (i: Int) => i * i > i + i
    val rdd2 = rdd.filter(f)
    

    filter函数传入的也是函数。

    def filter(f: T => Boolean): RDD[T] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
    }
    

    首先经过闭包函数检测,然后创建MapPartitionsRDD, 在传入函数时调用scala的filter函数。

    • flatMap算子

    将函数应用于RDD的所有元素,返回的是扁平化的结果。

    val rdd = spark.sparkContext.parallelize(1 to 10)
    val rdd2 = rdd.map(i => "" + i).flatMap(i => i)
    

    flatMap要求返回的值必须为可遍历的,具有TraversableOnce特性,string,list,array,buffer,set等。

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }
    
    • randomSplit

    将RDD根据权重切分为多个RDD。

    val rdd = spark.sparkContext.parallelize(Seq(2, 4, 5, 2, 4), 2)
    val rdd2 = rdd.randomSplit(Array(0.3, 0.2))
    println(rdd2(0).collect().mkString(","))
    println(rdd2(1).collect().mkString(","))
    2,4,5
    2,4
    

    randomSplit任然属于窄依赖,其实现原理依赖于MapPartitionsRDD, 具体来看如何实现?

    def randomSplit(
        weights: Array[Double],
        seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
      withScope {
        // 统计权重的和
        val sum = weights.sum
        // scanLeft 从左向右进行累加,并返回数组,返回百分比的累加数组
        val normalizedCumWeights: Array[Double] = weights.map(_ / sum).scanLeft(0.0d)(_ + _) 
        // sliding 每2个元素作为list,滑动的步长
        normalizedCumWeights.sliding(2).map { x =>
          randomSampleWithRange(x(0), x(1), seed)
        }.toArray
      }
    }
    

    从源码分析可知,randomSplit是将求出权重占比的累加和,并两两切分为一组,调用randomSampleWithRange函数。

    private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {
      this.mapPartitionsWithIndex( { (index, partition) =>
        val sampler = new BernoulliCellSampler[T](lb, ub)
        sampler.setSeed(seed + index)
        sampler.sample(partition)
      }, isOrderSensitive = true, preservesPartitioning = true)
    }
    

    从上面可以看出该函数最终调用的是mapPartitionsWithIndex算子。 每一个分区的数量是以传入的分区权重创建的伯努利分布的采样器从分区中获得,并分配到新的RDD中。属于1:n的窄依赖。这里它的分区器是父RDD的分区器preservesPartitioning = true。

    • glom算子

    glom算子是将分区的所有算子装入Array数组中。

    def glom(): RDD[Array[T]] = withScope {
      new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) =>Iterator(iter.toArray))
    }
    

    可以看出其就是在调用了toArray方法。

    • zipWithUniqueId 算子

    给RDD的元素添加id, 第k分区的元素id 分别为 k, n + k, 2n + k ... ; n为分区数。所以通过这种方式产生的id是唯一的但不一定是连续的。例如第一个分区仅仅2个元素,其他分区为10个元素。

    zipWithUniqueId 的实现并不是依赖于ZippedWithIndexRDD,而是通过编写在分区去重,并增加id的函数,通过mapPartitionsWithIndex算子实现的。所以它不会触发Spark执行job。

    def zipWithUniqueId(): RDD[(T, Long)] = withScope {
      val n = this.partitions.length.toLong
      this.mapPartitionsWithIndex { case (k, iter) =>
        Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
          (item, i * n + k)
        }
      }
    }
    

    举个实际的例子:

    val rdd1 = spark.sparkContext.parallelize(Seq(2, 4, 2, 5, 8, 9), 4)
    val rdd3 = rdd1.zipWithUniqueId()
    println(rdd3.collect().mkString(","))
    (2,0),(4,1),(2,5),(5,2),(8,3),(9,7)
    

    四个分区的数据为: part0: 2, part1: 4, 2, part2: 5, part3: 8,9

    按照上面的规则 分区id + n * 元素id , 2 的id 0, 4 的id为1, 2的id为5, 5的id为2 ....。

    这个算子给只要求id,但并不要求连续。

    ShuffledRDD

    • distinct 算子 - (实质reduceByKey)

    distinct算子是对RDD中的元素进行去重操作。distinct是由其他基础的算子组合实现的,其原理是使用map将其转换为(key,null),调用reduceBykey进行聚合去重,最后再使用map转换为key。由于reduceByKey是转换算子,所以distinct也属于转换算子。

    val rdd = spark.sparkContext.parallelize(Seq(2, 4, 5, 2, 4), 2)
    val rdd2 = rdd.distinct()
    

    我们看下其源码的实现:

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
    }
    

    distinct使用的distinct(partitions.length),传入当前RDD的分区数,然后使用reduceByKey进行去重。

    distinct算子的RDD依赖关系为:

    MapPartitionsRDD → ShuffledRDD → MapPartitionsRDD


    以下为多个RDD的操作算子。

    UnionRDD

    • union算子

    union算子可以合并多个RDD,但合并后的结果中会出现重复元素。

    val rdd = spark.sparkContext.parallelize(Seq(2, 4, 5, 2, 4), 2)
    val rdd1 = spark.sparkContext.parallelize(Seq(2, 4, 5, 8, 9))
    val rdd2 = rdd.union(rdd1)
    2,4,5,2,4,2,4,5,8,9
    

    从源码来看union算子。

    def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
      // 非空RDDs
      val nonEmptyRdds = rdds.filter(!_.partitions.isEmpty)
      // 收集RDD的分区器
      val partitioners = nonEmptyRdds.flatMap(_.partitioner).toSet
      // 所有的RDD都定义了相同的一个分区器
      if (nonEmptyRdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
        new PartitionerAwareUnionRDD(this, nonEmptyRdds)
      } else {
        new UnionRDD(this, nonEmptyRdds)
      }
    }
    

    从上面代码可以看出,先会过滤出非空的RDD, 然后判断所有的RDD都定义了相同的一个分区器,则创建一个PartitionerAwareUnionRDD。PartitionerAwareUnionRDD 的思路为将所有的RDD看做为一个RDD。例如,现在有m个RDD, 每个RDDp个分区,且采用一样的分区器,则将其看为一个具有p个分区的一个RDD。否则,创建UnionRDD。

    现在我们看UnionRDD是如何实现的:

    override def compute(s: Partition, context: TaskContext): Iterator[T] = {
      val part: UnionPartition[T] = s.asInstanceOf[UnionPartition[T]]
      parent[T](part.parentRddIndex).iterator(part.parentPartition, context)
    }
    

    compute中实现非常简单,就是将分区转换为UnionPartition子类型,然后调用part.parentPartition传给父分区的迭代器。那么UnionPartition如何实现呢?

    private[spark] class UnionPartition[T: ClassTag](
        idx: Int,
        @transient private val rdd: RDD[T],
        val parentRddIndex: Int,
        @transient private val parentRddPartitionIndex: Int)
      extends Partition {
    
      var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
    
      def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)
    
      override valindex: Int = idx
    
      @throws(classOf[IOException])
      private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException{
        // Update the reference to parent split at the time of task serialization
    parentPartition= rdd.partitions(parentRddPartitionIndex)
        oos.defaultWriteObject()
      }
    }
    

    可见parentPartition就是通过parentRddPartitionIndex获得其分区的数据,然后获取分区的位置。可见是直接从父分区转换为当前RDD的分区,同时UnionPartition还实现了writeObject,用于序列化,可以进行网络传输。

    那么是如何创建UnionPartition?

    override def getPartitions: Array[Partition] = {
     // rdd数是否大于rdd的并发门限 
      val parRDDs = if (isPartitionListingParallel) {
        val parArray = rdds.par
        // ForkJoinPool采取工作窃取算法,以避免工作线程由于拆分了任务之后的join等待过程。这样处于空闲的工作线程将从其他工作线程的队列中主动去窃取任务来执行。
        parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
    parArray
      } else {
        rdds
      }
      // unionRDD的分区数为所有RDD的分区数总和
      val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
      var pos = 0
      for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
        // 每一个分区创建一个UnionPartition
        array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
        pos += 1
      }
      array
    }
    

    可见是通过遍历所有的RDD分区,依次封装为一个UnionPartition, 计数值pos为其新分区编号,并传入其原分区编号split.index。

    其依赖关系是什么?

    override def getDependencies: Seq[Dependency[_]] = {
      val deps = new ArrayBuffer[Dependency[_]]
      var pos = 0
      for (rdd <- rdds) {
        deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
        pos += rdd.partitions.length
      }
      deps
    }
    

    从上面源码可以看出,其依赖关系是RangeDependency。虽然其为n:1的关系,但是由于其分区和父RDD的分区是一致的,所以其任然属于窄依赖。RangeDependency会传入范围开始pos, 长度rdd.partitions.length。

    CoGroupedRDD

    • intersection 算子

    两个RDD的交集,不会包含重复元素。intersection 算子 一般会有shuffle过程。它会是ShuffleDependency依赖。

    val rdd = spark.sparkContext.parallelize(Seq(2, 4, 5, 2, 4), 2)
    val rdd1 = spark.sparkContext.parallelize(Seq(2, 4, 5, 8, 9))
    val rdd2 = rdd.intersection(rdd1)
    2,4,5
    

    intersection实际上也是一个复合算子,先将两个RDD转换为(key,null)键值对数据,然后调用cogroup将其按照key进行聚合,生成的value中如果两个迭代器都存在,说明当前key在两个RDD中都存在。最后返回keys。这里不详细介绍CoGroupedRDD,之后会详细进行阐述

    def intersection(other: RDD[T]): RDD[T] = withScope {
      this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
          .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
          .keys
    }
    

    SubtractedRDD

    • subtract算子

    返回在this RDD中,但不在other RDD中的元素。subtract一般会有shuffle过程。它可能会是ShuffleDependency依赖。

    val rdd1 = spark.sparkContext.parallelize(Seq(2, 4, 5, 8, 9))
    val rdd = spark.sparkContext.parallelize(Seq(2, 4, 5, 2, 4), 2)
    val rdd2 = rdd1.subtract(rdd)
    8,9
    

    源码如下:

    实质是调用subtractByKey,subtractByKey的实现是SubtractedRDD, SubtractedRDD的原理和CoGroupedRDD是一样的,(CoGroupedRDD的原理是JHashMap), 只是SubtractedRDD,SubtractedRDD返回的元素一定是RDD1中的,所以可以将RDD1保存在内存中,RDD2以流读,从RDD1中删除,可以直接使用rdd1's partitioner/partition size,不用担心内存溢出。

    def subtract(
        other: RDD[T],
        p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      if (partitioner==Some(p)) {
        // Our partitioner knows how to handle T (which, since we have a partitioner, is
        // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
        val p2 = new Partitioner() {
          override def numPartitions: Int = p.numPartitions
          override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
        }
        // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
        // anyway, and when calling .keys, will not have a partitioner set, even though
        // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
        // partitioned by the right/real keys (e.g. p).
        this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
      } else {
        this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
      }
    }
    

    ZippedPartitionsBaseRDD

    • zip 算子

    将两个RDD组合为key-val对,key来自于第一个RDD,val来自于第二个RDD。但是使用zip算子有个前提是,两个RDD必须有相同的分区数,每一个分区中也必须有相同的元素数,否则会在运行时进行抛错。

    def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
      zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
        new Iterator[(T, U)] {
          def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
            case (true, true) => true
            case (false, false) => false
            case _ => throw new SparkException("Can only zip RDDs with " +
              "same number of elements in each partition")
          }
          def next(): (T, U) = (thisIter.next(), otherIter.next())
        }
      }
    }
    

    从zip源码可以看出,当两个RDD的分区迭代器执行hasNext返回的结果不相同,会返回SparkException。zip中直接调用的是zipPartitions算子。同时保留父分区器为false,preservesPartitioning = false。

    def zipPartitions[B: ClassTag, V: ClassTag]
        (rdd2: RDD[B], preservesPartitioning: Boolean)
        (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
      new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
    }
    

    zipPartitions算子,是直接创建了ZippedPartitionsRDD2,并将刚构建的函数传入。

    override def compute(s: Partition, context: TaskContext): Iterator[V] = {
      val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
      f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
    }
    

    在调用compute时,先将其转换为ZippedPartitionsPartition。

    override def getPartitions: Array[Partition] = {
      val numParts = rdds.head.partitions.length
      if (!rdds.forall(rdd => rdd.partitions.length == numParts)) {
        throw new IllegalArgumentException(
          s"Can't zip RDDs with unequal numbers of partitions:${rdds.map(_.partitions.length)}")
      }
      // 表示形成分区行数的列表,i 代表分区id
      Array.tabulate[Partition](numParts) { i =>
        val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i)))
        // Check whether there are any hosts that match all RDDs; otherwise return the union
        val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
        val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct
        new ZippedPartitionsPartition(i, rdds, locs)
      }
    }
    

    从中可以看出rdd.partitions.length必须等于第一个RDD的分区数,所以其分区数必须相等。intersect为交集。求所有分区是否在同一台机器上,如果是则返回该机器,否则返回所有机器。

    private[spark] class ZippedPartitionsPartition(
        idx: Int,
        @transient private val rdds: Seq[RDD[_]],
        @transient val preferredLocations: Seq[String])
      extends Partition {
    
      override valindex: Int = idx
     // partitionValues 将两个RDD的的分区数组装入Seq集合,partitionValues(0)为RDD1的所有分区
     // partitionValues(1)为RDD2的所有分区
      var partitionValues: Seq[Partition] = rdds.map(rdd => rdd.partitions(idx))
      def partitions: Seq[Partition] =partitionValues
    
    // 实现writeObject可以序列化
    @throws(classOf[IOException])
      private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException{
        // Update the reference to parent split at the time of task serialization
    partitionValues= rdds.map(rdd => rdd.partitions(idx))
        oos.defaultWriteObject()
      }
    }
    

    所以compute的f函数的输入参数分别为RDD1的分区数组,RDD2的分区数组,返回的值为next方法决定,def next(): (T, U) = (thisIter.next(), otherIter.next()) 为遍历形成的键值对,所以元素和分区数必须相等。

    Zip不会发生Shuffle操作,其依赖关系为简单的OneToOne的依赖关系。

    • zipPartitions 算子

    zipPartitions 可以对两个~四个RDD进行zip操作,和mapPartitions类似,其是执行在对应分区的,并没有提供在分区内具体的执行函数,只对返回值类型进行了定义Iterator[T]。zipPartitions 要求分区数必须相同,但是其对分区内元素的个数没有限制,可以进行自己实现函数定义,对元素不一致的情况进行忽略处理。

    val rdd1 = spark.sparkContext.parallelize(Seq(2, 4, 5, 8, 9), 2)
    val rdd = spark.sparkContext.parallelize(Seq(2, 4, 0), 2)
    val f = (r1: Iterator[Int], r2: Iterator[Int]) => new Iterator[(Integer, Integer)] {
      def hasNext: Boolean = (r1.hasNext, r2.hasNext) match {
        case (true, true) => true
        case (false, false) => false
        case (false, true) => false
        case (true, false) => false
      }
      def next(): (Integer, Integer) = {
        var left: Integer = null
        var right: Integer = null
        if (r1.hasNext) {
          left = r1.next()
        }
        if (r2.hasNext) {
          right = r2.next()
        }
        (left, right)
      }
    }
    val rdd3 = rdd.zipPartitions(rdd1)(f)
    println(rdd3.collect().mkString(","))
    

    ZippedWithIndexRDD

    • zipWithIndex 算子

    给RDD中的元素加入index, 其添加id的顺序是先按照分区进行排序,然后在按照分区内的元素进行排序。

    /** The start index of each partition. */
    @transient private valstartIndices: Array[Long] = {
      val n = prev.partitions.length
      if (n == 0) {
        Array.empty
    } else if (n == 1) {
    Array(0L)
      } else {
        prev.context.runJob(
          prev,
          Utils.getIteratorSize_,
          0 until n - 1 // do not need to count the last partition
        ).scanLeft(0L)(_ + _)
      }
    }
    

    那么每一个分区的startId为多少是如何计算的?如果没有分区,则为空数组,如果只有一个分区,则返回Array(0L)。否则会提交一个Job计算所有分区的元素size,然后使用scanLeft(0L)生成累加和数组。

    override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
      val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
      val parentIter = firstParent[T].iterator(split.prev, context)
      Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
    }
    

    调用Compute首先将其转换为ZippedWithIndexRDDPartition分区,然后获取父RDD的迭代器,传入getIteratorZipWithIndex方法:

    def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
      new Iterator[(T, Long)] {
    require(startIndex >= 0, "startIndex should be >= 0.")
        varindex: Long = startIndex - 1L
        def hasNext: Boolean = iterator.hasNext
        def next(): (T, Long) = {
    index+= 1L
          (iterator.next(),index)
        }
      }
    }
    

    getIteratorZipWithIndex 方法的实现和zip算子传入的函数是类似的,最后通过迭代器返回一个元组。这里不使用zipPartitions算子进行实现的原因,为了提高效率,给原子增加id, 不用多个RDD整合进行数据传输,也不要求分区数必须相同。

    CoalescedRDD

    • coalesce 算子

    缩减分区数,实际上它可以实现扩展分区数,但其只是在缩减分区前进行shuffle进行扩增,然后再封装为CoalescedRDD。所以其实际实现只有缩减分区。

    coalesce设置shuffle参数可以分为三种情况(N代表原有的分区数,M要划分的分区数):

    • N < M , 这种情况下一般数原分区的数据分布不均,需要进行划分。可以采用默认的分区器HashPartitioner, shuffle参数需要设置为true。
    • N > M, N和M差不多的情况下,可以将shuffle设置为false,这种情况就是将多个分区合并为一个新的分区。父RDD和子RDD是窄依赖的关系。
    • N > M, N 和 M 差距比较大的情况下,这时如果将shuffle设置为false, 则造成Spark程序的并行度不够,为提高并行度可以将shuffle设置为true。

    总结:扩大分区数需要进行shuffle, 减少分区数可以采用窄依赖,如果减少的特别多,可以通过打开shuffle增加并发。

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
                 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                (implicit ord: Ordering[T] = null)
        : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
      if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
          var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
          items.map { t =>
            // Note that the hash code of the key will just be the key itself. The HashPartitioner
            // will mod it with the number of total partitions.
            position = position + 1
            (position, t)
          }
        } : Iterator[(Int, T)]
    
        // include a shuffle step so that our upstream tasks are still distributed
        new CoalescedRDD(
          new ShuffledRDD[Int, T, T](
            mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
            new HashPartitioner(numPartitions)),
          numPartitions,
          partitionCoalescer).values
      } else {
        new CoalescedRDD(this, numPartitions, partitionCoalescer)
      }
    }
    

    如果shuffle=false,则会创建CoalescedRDD;如果shuffle=true, 则会先将当前RDD封装为,ShuffledRDD, 同时调用mapPartitionsWithIndex算子,new Random(hashing.byteswap32(index)).nextInt(numPartitions) 通过随机函数,将元素随机分配到新的分区,生成(position, t)。其中使用的分区器为HashPartitioner。最后再封装为CoalescedRDD。

    override def getPartitions: Array[Partition] = {
      val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())
    
      pc.coalesce(maxPartitions, prev).zipWithIndex.map {
        case (pg, i) =>
          val ids = pg.partitions.map(_.index).toArray
          new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
      }
    }
    

    在生成分区数组时,首先创建一个new DefaultPartitionCoalescer(),它的作用是将父RDD分区合并为更少的分区。

    如果父节点中没有位置信息(没有首选位置),则以块的形式在 Array 中靠近的块父项。

    如果有位置信息,它会继续用以下四个方式处理:平衡groups使它们大致具有和父分区相同数量。尝试为每个分区分配唯一的首选机器。如果所需的分区大于首选机器的数量(可能发生),它需要开始挑选重复的首选机器。

    最后实现标记机器和分区以及其依赖的父分区。

    private[spark] case class CoalescedRDDPartition(
        index: Int,
        @transient rdd: RDD[_],
        parentsIndices: Array[Int],
        @transient preferredLocation: Option[String] = None) extends Partition {
      varparents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
    
      @throws(classOf[IOException])
      private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException{
        // Update the reference to parent partition at the time of task serialization
    parents= parentsIndices.map(rdd.partitions(_))
        oos.defaultWriteObject()
      }
    

    CoalescedRDDPartition 只是记录当前分区id下的父分区。

    override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
      partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
        firstParent[T].iterator(parentPartition, context)
      }
    }
    

    调用当前缩减后的分区,并将其铺平(即父分区的多个分区,可能在子分区的一个中)。调用其迭代器,总的来说实现还是很简单的,只是进行了调用。

    CoalescedRDD 是窄依赖,但是如果开启shuffle, 会在CoalescedRDD前在创建一个shuffleRDD。

    • repartition 算子

    返回一个指定分区的新RDD。repartition内部是调用了coalesce算子,其中shuffle=true, 是强制执行shuffle过程的。

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      coalesce(numPartitions, shuffle = true)
    }
    

    如果是进行减少分区数,可以使用coalesce算子,避免执行shuffle过程。

    相关文章

      网友评论

        本文标题:Spark 转换算子源码

        本文链接:https://www.haomeiwen.com/subject/vbubkrtx.html