美文网首页
Spark中的聚合算子

Spark中的聚合算子

作者: 天之見證 | 来源:发表于2019-10-02 22:12 被阅读0次

    1. combineByKeyWithClassTag

    最终实现上用到combineByKeyWithClassTag的算子有如下(RDD[(K, V)]):

    算子\特点 备注 是否有map端合并 中间结果类型 输出
    reduceByKey V (K, V)
    groupByKey CompactBuffer[V] (K, Iterable[V])
    combineByKey C (K, C)
    foldByKey 带有初始化0值的reduceByKey V (K, V)
    aggregateByKey 带有不同类型的初始化0值的foldByKey U (K, U)

    1.1 combineByKeyWithClassTag的实现

    def combineByKeyWithClassTag[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
      require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        }
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
        }
      }
      val aggregator = new Aggregator[K, V, C](
        self.context.clean(createCombiner),
        self.context.clean(mergeValue),
        self.context.clean(mergeCombiners))
      if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
        new ShuffledRDD[K, V, C](self, partitioner)
          .setSerializer(serializer)
          .setAggregator(aggregator)
          .setMapSideCombine(mapSideCombine)
      }
    }
    

    从以上3个实现可以看出 createCombiner, mergeValue mergeCombiners 最终都用作初始化 aggregator

    1. createCombiner
    2. mergeValue
    3. mergeCombiners

    从上可以看出是否有shuffle 其实是通过 self.partitioner == Some(partitioner) 这个来判断的, 及该算子上要加的partition是否和RDD上的partition 相同

    1.2 CompactBuffer

    CompactBuffer 的功能类似ArrayBuffer, 在较小的数据集上有更好的的内存利用率

    ArrayBuffer: 总是初始分配一个具有16个元素的数组, 当实际中的数据如果远小于16时, 会造成较大的空间浪费 (array: Array[AnyRef] = new Array[AnyRef](math.max(initialSize, 1)))

    CompactBuffer: 默认只有2个元素, 如果groupBy下key对应的value较少, 则空间利用会更好

    CompactBuffer 代码如下:

    private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
      // First two elements
      private var element0: T = _
      private var element1: T = _
    
      // Number of elements, including our two in the main object
      private var curSize = 0
    
      // Array for extra elements
      private var otherElements: Array[T] = null
    
      def apply(position: Int): T = {
        if (position < 0 || position >= curSize) {
          throw new IndexOutOfBoundsException
        }
        if (position == 0) {
          element0
        } else if (position == 1) {
          element1
        } else {
          otherElements(position - 2)
        }
      }
    
      private def update(position: Int, value: T): Unit = {
        if (position < 0 || position >= curSize) {
          throw new IndexOutOfBoundsException
        }
        if (position == 0) {
          element0 = value
        } else if (position == 1) {
          element1 = value
        } else {
          otherElements(position - 2) = value
        }
      }
    
      def += (value: T): CompactBuffer[T] = {
        val newIndex = curSize
        if (newIndex == 0) {
          element0 = value
          curSize = 1
        } else if (newIndex == 1) {
          element1 = value
          curSize = 2
        } else {
          growToSize(curSize + 1)
          otherElements(newIndex - 2) = value
        }
        this
      }
    
      def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
        values match {
          // Optimize merging of CompactBuffers, used in cogroup and groupByKey
          case compactBuf: CompactBuffer[T] =>
            val oldSize = curSize
            // Copy the other buffer's size and elements to local variables in case it is equal to us
            val itsSize = compactBuf.curSize
            val itsElements = compactBuf.otherElements
            growToSize(curSize + itsSize)
            if (itsSize == 1) {
              this(oldSize) = compactBuf.element0
            } else if (itsSize == 2) {
              this(oldSize) = compactBuf.element0
              this(oldSize + 1) = compactBuf.element1
            } else if (itsSize > 2) {
              this(oldSize) = compactBuf.element0
              this(oldSize + 1) = compactBuf.element1
              // At this point our size is also above 2, so just copy its array directly into ours.
              // Note that since we added two elements above, the index in this.otherElements that we
              // should copy to is oldSize.
              System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
            }
    
          case _ =>
            values.foreach(e => this += e)
        }
        this
      }
    
      override def length: Int = curSize
    
      override def size: Int = curSize
    
      override def iterator: Iterator[T] = new Iterator[T] {
        private var pos = 0
        override def hasNext: Boolean = pos < curSize
        override def next(): T = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          pos += 1
          apply(pos - 1)
        }
      }
    
      /** Increase our size to newSize and grow the backing array if needed. */
      private def growToSize(newSize: Int): Unit = {
        // since two fields are hold in element0 and element1, an array holds newSize - 2 elements
        val newArraySize = newSize - 2
        val arrayMax = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
        if (newSize < 0 || newArraySize > arrayMax) {
          throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
        }
        val capacity = if (otherElements != null) otherElements.length else 0
        if (newArraySize > capacity) {
          var newArrayLen = 8L
          while (newArraySize > newArrayLen) {
            newArrayLen *= 2
          }
          if (newArrayLen > arrayMax) {
            newArrayLen = arrayMax
          }
          val newArray = new Array[T](newArrayLen.toInt)
          if (otherElements != null) {
            System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
          }
          otherElements = newArray
        }
        curSize = newSize
      }
    }
    

    1.3 Aggregator

    Aggregator 具体实现如下, 可以看做是ExternalAppendOnlyMap操作的一个代理, ExternalAppendOnlyMap 会有spill数据的过程

    case class Aggregator[K, V, C] (
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C) {
    
      def combineValuesByKey(
          iter: Iterator[_ <: Product2[K, V]],
          context: TaskContext): Iterator[(K, C)] = {
        val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
        combiners.insertAll(iter)
        updateMetrics(context, combiners)
        combiners.iterator
      }
    
      def combineCombinersByKey(
          iter: Iterator[_ <: Product2[K, C]],
          context: TaskContext): Iterator[(K, C)] = {
        val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
        combiners.insertAll(iter)
        updateMetrics(context, combiners)
        combiners.iterator
      }
    
      /** Update task metrics after populating the external map. */
      private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {
        Option(context).foreach { c =>
          c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
          c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
          c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
        }
      }
    }
    

    2. treeAggregate

    2.1 源码解读

    def treeAggregate[U: ClassTag](zeroValue: U)(
        seqOp: (U, T) => U,
        combOp: (U, U) => U,
        depth: Int = 2): U = withScope {
      require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
      if (partitions.length == 0) {
        Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
      } else {
        val cleanSeqOp = context.clean(seqOp)
        val cleanCombOp = context.clean(combOp)
        val aggregatePartition =
          (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
        var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
        var numPartitions = partiallyAggregated.partitions.length
        val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
        // If creating an extra level doesn't help reduce
        // the wall-clock time, we stop tree aggregation.
    
        // Don't trigger TreeAggregation when it doesn't save wall-clock time
        while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
          numPartitions /= scale
          val curNumPartitions = numPartitions
          partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
            (i, iter) => iter.map((i % curNumPartitions, _))
          }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
        }
        val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
        partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
      }
    }
    

    从上面的代码可以大概将计算分为3块:

    1. 分区数据和合并
    2. 分区间的合并 (foldByKey将数据进行重分区)
    3. 最后数据的合并

    2.2 scala的aggregate和spark的aggregate/treeAggregate

    数据集: Seq(1, 2, 3), 由于 0值 的选择造成结果会有差异

    代码 结果 备注
    Seq(1, 2, 3).aggregate(1)((acc, e) => acc + e, (e1, e2) => e1 + e2) 7
    data.repartition(1).aggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e) 8 driver端再聚合
    data.repartition(1).treeAggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e) 9 driver端会聚合2次

    以上代码中的repartition 直接关系到scala版本和spark版本最终结果的差值的大小

    3. 代码片段

    3.1 lazy的序列化对象

    def foldByKey(
        zeroValue: V,
        partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
      // Serialize the zero value to a byte array so that we can get a new clone of it on each key
      val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
      val zeroArray = new Array[Byte](zeroBuffer.limit)
      zeroBuffer.get(zeroArray)
    
      // When deserializing, use a lazy val to create just one instance of the serializer per task
      lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
      val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
    
      val cleanedFunc = self.context.clean(func)
      combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
        cleanedFunc, cleanedFunc, partitioner)
    }
    

    相关文章

      网友评论

          本文标题:Spark中的聚合算子

          本文链接:https://www.haomeiwen.com/subject/vpbrpctx.html