美文网首页
Spark Partitioner 源码分析

Spark Partitioner 源码分析

作者: wangdy12 | 来源:发表于2018-03-28 15:48 被阅读0次

    Partitioner

    首先RDD类型为K/V对的数据才会有分区器,用来确定数据按照Key值划分到哪一个分区,其定义如下:

    abstract class Partitioner extends Serializable {
      def numPartitions: Int //分区总数
      def getPartition(key: Any): Int //key对应的partition索引
    }
    

    Spark内部提供了HashPartitioner和RangePartitioner两种分区策略

    HashPartitioner

    通过key的hashCode,对numPartitions取模,如果key比较均匀,能够大致确保每个partition中数据量均匀分布

    class HashPartitioner(partitions: Int) extends Partitioner {
      require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    
      def numPartitions: Int = partitions
    
      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }
    
      override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
          h.numPartitions == numPartitions
        case _ =>
          false
      }
    
      override def hashCode: Int = numPartitions
    }
    

    RangePartitioner

    先进行一次采样,如果不够均匀,再次采样,每次采样都会使用collect()方法,所以最坏情况下运行到sortByKey时,需要额外启动2个job,对应的stage要跑三次才能完成

    大致步骤:

    • 计算每个分区的采样数目
    • 蓄水池采样,输出rdd元素的总数,以及每个分区对应的元素个数和采样结果(collect()会触发Job)
    • 计算总体的采样率
    • 如果分区的采样率过低,标记该分区,需要重新采样
    • 采样率合格,每个采样的到的key对应一个权重,数值为该分区采样率的倒数,即分区元素数目 / 采样数目
    • 对不合格的分区重新采样(collect()会触发Job),这一次会直接设定采样率为总体采样率,同样,每个采样的到的key对应一个权重
    • 完成采样,获取总权重,计算出每个分区对应的权重
    • 对(key,权重)按照key排序,根据权重划分范围
    class RangePartitioner[K : Ordering : ClassTag, V](
        partitions: Int,
        rdd: RDD[_ <: Product2[K, V]],
        private var ascending: Boolean = true,
        val samplePointsPerPartitionHint: Int = 20)
      extends Partitioner {
      def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
        this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
      }
    
      private var ordering = implicitly[Ordering[K]]
    
      // An array of upper bounds for the first (partitions - 1) partitions
      private var rangeBounds: Array[K] = {
        if (partitions <= 1) {
          Array.empty
        } else {
          //总样本大小sampleSize,每个Partition取样20条,最多不超过1M
          val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
          //过采样,总采样数目乘以系数3,假定每个输入分区的数据量大致均衡
          val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
          //通过蓄水池取样 返回RDD元素的总数,以及一个抽样数据的数组Array[(Int, Long, Array[K])]),对应为分区号,分区内的元素数目,该分区的取样数据
          val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
          if (numItems == 0L) {
            Array.empty
          } else {
            // 对包含过多元素的partition重新采样,确保采集到足够充分的数据
            // 平均采样率,实际采样率要高三倍
            val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
            val candidates = ArrayBuffer.empty[(K, Float)]
            val imbalancedPartitions = mutable.Set.empty[Int]
            sketched.foreach { case (idx, n, sample) =>
              //该Partition的元素数目过多,实际采样率低于fraction,记录
              if (fraction * n > sampleSizePerPartition) {
                imbalancedPartitions += idx
              } else {
                // 采样率达到要求,设定每个样本(键值Key)的权重 权重=分区元素总数/分区采样数,采样率的倒数
                val weight = (n.toDouble / sample.length).toFloat
                for (key <- sample) {
                  candidates += ((key, weight))
                }
              }
            }
            if (imbalancedPartitions.nonEmpty) {
              // 以期望的采样概率重新采样不均匀的Partition
              // 创建分区修剪RDD,对采样不均匀的分区重新采样,并对样本设定权重
              val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
              val seed = byteswap32(-rdd.id - 1)
              val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
              val weight = (1.0 / fraction).toFloat
              //设定每个采样到的元素对应的权重,采样率的倒数
              candidates ++= reSampled.map(x => (x, weight))
            }
            // 决定分区的划分边界
            RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
          }
        }
      }
    }
    

    边界划分

    依据候选中的权重划分分区,权重值可以理解为该Key值所代表的元素数目
    返回一个数组,长度为partitions - 1,第i个元素作为第i个分区内元素key值的上界

      def determineBounds[K : Ordering : ClassTag](
          candidates: ArrayBuffer[(K, Float)],
          partitions: Int): Array[K] = {
        val ordering = implicitly[Ordering[K]]
        //依据Key进行排序,升序
        val ordered = candidates.sortBy(_._1)
        val numCandidates = ordered.size
        //计算出权重和,以及每个Partition的平均权重
        val sumWeights = ordered.map(_._2.toDouble).sum
        val step = sumWeights / partitions
        var cumWeight = 0.0
        var target = step
        val bounds = ArrayBuffer.empty[K]
        var i = 0
        var j = 0
        var previousBound = Option.empty[K]
        while ((i < numCandidates) && (j < partitions - 1)) {
          val (key, weight) = ordered(i)
          //权重累加
          cumWeight += weight
          //达到分割的目标值
          if (cumWeight >= target) {
            // 相同key值处于相同的Partition中,key值不同可以进行分割
            if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
              bounds += key //记录边界
              target += step
              j += 1
              previousBound = Some(key)
            }
          }
          i += 1
        }
        bounds.toArray
      }
    

    获取分区

    getPartition,边界数目少于等于128,直接遍历比较key和边界数组,得到分区索引,否则使用二分查找获取分区位置,最后根据升序还是降序,返回相应的PartitionId

     private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
    
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[K]
        var partition = 0
        if (rangeBounds.length <= 128) {
          // If we have less than 128 partitions naive search
          while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
            partition += 1
          }
        } else {
          // Determine which binary search method to use only once.
          partition = binarySearch(rangeBounds, k)
          // binarySearch either returns the match location or -[insertion point]-1
          if (partition < 0) {
            partition = -partition-1
          }
          if (partition > rangeBounds.length) {
            partition = rangeBounds.length
          }
        }
        if (ascending) {
          partition
        } else {
          rangeBounds.length - partition
        }
      }
    

    蓄水池取样 Reservoir Sampling

    适用于从包含n个项目的集合中选取k个样本,其中n为一很大或未知的数量

    数学原理:共有n个对象,将前k个对象放入“水库”,从k+1个对象开始,以k/(k+1)的概率选择该对象,以k/(k+2)的概率选择第k+2个对象,以此类推,以k/m的概率选择第m个对象(m>k)。如果m被选中,则随机替换水库中的一个对象。最终每个对象被选中的概率均为k/n

      /**
       * 对每个分区进行蓄水池采样,采样实际上会触发一个Job
       *
       * @param rdd 需要扫描的 RDD,只包含key值
       * @param sampleSizePerPartition 每个分区最大采样数目
       * @return (total number of items, an array of (partitionId, number of items, sample))
       */
      def sketch[K : ClassTag](
          rdd: RDD[K],
          sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
        val shift = rdd.id 
        val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
          val seed = byteswap32(idx ^ (shift << 16)) //随机种子
          val (sample, n) = SamplingUtils.reservoirSampleAndCount(
            iter, sampleSizePerPartition, seed)
          Iterator((idx, n, sample))
        }.collect()//触发Job
        val numItems = sketched.map(_._2).sum//各个分区元素数目之和
        (numItems, sketched)
      }
    

    采样的核心方法,返回采样结果,以及输入数据总数

      def reservoirSampleAndCount[T: ClassTag](
          input: Iterator[T],
          k: Int,
          seed: Long = Random.nextLong())
        : (Array[T], Long) = {
        val reservoir = new Array[T](k) //蓄水池的大小为K
        //把前k个元素放入蓄水池中
        var i = 0
        while (i < k && input.hasNext) {
          val item = input.next()
          reservoir(i) = item
          i += 1
        }
    
        if (i < k) {
          // 如果输入数据量小于水池的大小k,截断数组直接返回
          val trimReservoir = new Array[T](i)
          System.arraycopy(reservoir, 0, trimReservoir, 0, i)
          (trimReservoir, i)
        } else {
          // 蓄水池已经填满,继续取样,根据概率决定是否进行替换已有采样数据
          var l = i.toLong
          val rand = new XORShiftRandom(seed)
          while (input.hasNext) {
            val item = input.next()
            l += 1
            //产生[0,l)类型为double的随机数
            val replacementIndex = (rand.nextDouble() * l).toLong
            //新的数据被选择的概率为k/l,替换对应索引位置的元素
            if (replacementIndex < k) {
              reservoir(replacementIndex.toInt) = item
            }
          }
          (reservoir, l)
        }
      }
    

    相关文章

      网友评论

          本文标题:Spark Partitioner 源码分析

          本文链接:https://www.haomeiwen.com/subject/nbpuqftx.html