1. combineByKeyWithClassTag
最终实现上用到combineByKeyWithClassTag
的算子有如下(RDD[(K, V)]
):
算子\特点 | 备注 | 是否有map端合并 | 中间结果类型 | 输出 |
---|---|---|---|---|
reduceByKey |
是 | V |
(K, V) |
|
groupByKey |
否 | CompactBuffer[V] |
(K, Iterable[V]) |
|
combineByKey |
是 | C |
(K, C) |
|
foldByKey |
带有初始化0值的reduceByKey
|
是 | V |
(K, V) |
aggregateByKey |
带有不同类型的初始化0值的foldByKey
|
是 | U |
(K, U) |
1.1 combineByKeyWithClassTag
的实现
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
从以上3个实现可以看出 createCombiner, mergeValue mergeCombiners
最终都用作初始化 aggregator
createCombiner
mergeValue
mergeCombiners
从上可以看出是否有shuffle
其实是通过 self.partitioner == Some(partitioner)
这个来判断的, 及该算子上要加的partition
是否和RDD
上的partition
相同
1.2 CompactBuffer
CompactBuffer
的功能类似ArrayBuffer
, 在较小的数据集上有更好的的内存利用率
ArrayBuffer
: 总是初始分配一个具有16个元素的数组, 当实际中的数据如果远小于16时, 会造成较大的空间浪费 (array: Array[AnyRef] = new Array[AnyRef](math.max(initialSize, 1))
)
CompactBuffer
: 默认只有2个元素, 如果groupBy
下key对应的value较少, 则空间利用会更好
CompactBuffer
代码如下:
private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
// First two elements
private var element0: T = _
private var element1: T = _
// Number of elements, including our two in the main object
private var curSize = 0
// Array for extra elements
private var otherElements: Array[T] = null
def apply(position: Int): T = {
if (position < 0 || position >= curSize) {
throw new IndexOutOfBoundsException
}
if (position == 0) {
element0
} else if (position == 1) {
element1
} else {
otherElements(position - 2)
}
}
private def update(position: Int, value: T): Unit = {
if (position < 0 || position >= curSize) {
throw new IndexOutOfBoundsException
}
if (position == 0) {
element0 = value
} else if (position == 1) {
element1 = value
} else {
otherElements(position - 2) = value
}
}
def += (value: T): CompactBuffer[T] = {
val newIndex = curSize
if (newIndex == 0) {
element0 = value
curSize = 1
} else if (newIndex == 1) {
element1 = value
curSize = 2
} else {
growToSize(curSize + 1)
otherElements(newIndex - 2) = value
}
this
}
def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
values match {
// Optimize merging of CompactBuffers, used in cogroup and groupByKey
case compactBuf: CompactBuffer[T] =>
val oldSize = curSize
// Copy the other buffer's size and elements to local variables in case it is equal to us
val itsSize = compactBuf.curSize
val itsElements = compactBuf.otherElements
growToSize(curSize + itsSize)
if (itsSize == 1) {
this(oldSize) = compactBuf.element0
} else if (itsSize == 2) {
this(oldSize) = compactBuf.element0
this(oldSize + 1) = compactBuf.element1
} else if (itsSize > 2) {
this(oldSize) = compactBuf.element0
this(oldSize + 1) = compactBuf.element1
// At this point our size is also above 2, so just copy its array directly into ours.
// Note that since we added two elements above, the index in this.otherElements that we
// should copy to is oldSize.
System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
}
case _ =>
values.foreach(e => this += e)
}
this
}
override def length: Int = curSize
override def size: Int = curSize
override def iterator: Iterator[T] = new Iterator[T] {
private var pos = 0
override def hasNext: Boolean = pos < curSize
override def next(): T = {
if (!hasNext) {
throw new NoSuchElementException
}
pos += 1
apply(pos - 1)
}
}
/** Increase our size to newSize and grow the backing array if needed. */
private def growToSize(newSize: Int): Unit = {
// since two fields are hold in element0 and element1, an array holds newSize - 2 elements
val newArraySize = newSize - 2
val arrayMax = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
if (newSize < 0 || newArraySize > arrayMax) {
throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
}
val capacity = if (otherElements != null) otherElements.length else 0
if (newArraySize > capacity) {
var newArrayLen = 8L
while (newArraySize > newArrayLen) {
newArrayLen *= 2
}
if (newArrayLen > arrayMax) {
newArrayLen = arrayMax
}
val newArray = new Array[T](newArrayLen.toInt)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
otherElements = newArray
}
curSize = newSize
}
}
1.3 Aggregator
Aggregator
具体实现如下, 可以看做是ExternalAppendOnlyMap
操作的一个代理, ExternalAppendOnlyMap
会有spill数据的过程
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
def combineValuesByKey(
iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}
def combineCombinersByKey(
iter: Iterator[_ <: Product2[K, C]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}
/** Update task metrics after populating the external map. */
private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {
Option(context).foreach { c =>
c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
}
}
}
2. treeAggregate
2.1 源码解读
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
// Don't trigger TreeAggregation when it doesn't save wall-clock time
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
}
从上面的代码可以大概将计算分为3块:
- 分区数据和合并
- 分区间的合并 (
foldByKey
将数据进行重分区) - 最后数据的合并
2.2 scala的aggregate
和spark的aggregate
/treeAggregate
数据集: Seq(1, 2, 3)
, 由于 0值 的选择造成结果会有差异
代码 | 结果 | 备注 |
---|---|---|
Seq(1, 2, 3).aggregate(1)((acc, e) => acc + e, (e1, e2) => e1 + e2) |
7 | |
data.repartition(1).aggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e) |
8 | driver端再聚合 |
data.repartition(1).treeAggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e) |
9 | driver端会聚合2次 |
以上代码中的repartition
直接关系到scala版本和spark版本最终结果的差值的大小
3. 代码片段
3.1 lazy的序列化对象
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}
网友评论