Spark k-v类型转换算子
MapPartitionsRDD
- mapValues 算子
将传入的函数应用于value的算子,实质是创建了MapPartitionsRDD,并在调用迭代函数时,只将函数应用于value。
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
从上面源码可以看出返回的仍然是key-val类型,但仅仅将函数应用于v。(k, cleanF(v))
- flatMapValues 算子
将键值对的value进行压平,并再进行map映射为k-v。实质还是调用了MapPartitionsRDD。
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
cleanF(v).map(x => (k, x))
},
preservesPartitioning = true)
}
可以发现在应用传入函数cleanF(v).map(x => (k, x))又对其进行散列。
ShuffledRDD
- partitionBy 算子
partitionBy 算子的功能和reparition的功能差不多,都是返回指定分区个数的分区。partitionBy 是针对key-val RDD的,在传入参数中可以传入使用的分区器。
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner== Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
分区中数组不能作为Hash分区器的key。其次判断传入的分区器和当前的分区器是否相等,相等则不会进行操作,直接返回。注意:分区器的相等,如果是HashPartitioner必须分区数也一致。
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
所以如果分区数是不变的那么直接返回,否则一定会进行ShuffledRDD。只有k-v会发生shuffle,这也是方便k-v修改shuffle时的分区器。
- combineByKey 算子
combineByKey 算子按照key将value进行聚合,它是combineByKeyWithClassTag算子的简化版本,使用的是HashPartitioner分区器。
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
需要传入三个函数:
createCombiner:组合器函数,定义了v如何转换为c。
mergeValue:合并值函数,定义了如何给定一个V将其与原来的C合并成新的C。
mergeCombiners:合并组合器函数,定义了如何将相同key下的C给合并成一个C。
举个例子:
val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x + x))
val createCombiner = (v: Int) =>List(v)
val mergeValue: (List[Int], Int) => List[Int] = (c: List[Int], v: Int) => v :: c
val mergeCombiners = (c1: List[Int], c2: List[Int]) => c1 ::: c2
val rdd4 = rdd2.combineByKey(createCombiner, mergeValue, mergeCombiners, 3)
println(rdd4.collect().mkString(","))
(6,List(12, 12)),(4,List(8, 8)),(1,List(2)),(5,List(10))
源码是使用了aggregation 函数,将RDD[(k,v)]类型转换为RDD[(k,c)], 将v聚合为c。其次在传参中用户可以通过mapSideCombine参数,来设置是否开启map端的聚合。
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
// 合并组合器函数必须定义
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
// key为数组,不能map端聚合
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
// hash分区器,key不能为数组
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
// 创建一个Aggregator
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner==Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
// 创建shuffleRDD
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
shuffledRDD顾明思意,会使用分区器将原数据进行打乱,并重新分配到新的分区中。下面我们将详细介绍shuffledRDD的实现过程:
- 获取分区数组
override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
生成一个分区size大小的分区数组,依次为每一个分区创建一个ShuffledRDDPartition。ShuffledRDDPartition仅仅保存了当前的分区id。
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
override valindex: Int = idx
}
- 获取RDD的依赖,并向ShuffleManage注册shuffle
override def getDependencies: Seq[Dependency[_]] = {
val serializer =userSpecifiedSerializer.getOrElse {
val serializerManager = SparkEnv.get.serializerManager
if (mapSideCombine) {
serializerManager.getSerializer(implicitly[ClassTag[K]],implicitly[ClassTag[C]])
} else {
serializerManager.getSerializer(implicitly[ClassTag[K]],implicitly[ClassTag[V]])
}
}
List(new ShuffleDependency(prev, part, serializer,keyOrdering,aggregator,mapSideCombine))
}
获得序列化器,并创建ShuffleDependency。在ShuffleDependency中会校验如果开启mapSideCombine,则要求Aggregator函数必须指定。其次,会获取keyClass, valueClass, combinerClass。以及生成并向shuffleManager注册shuffleId。
valshuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
在进行注册shuffle时会根据不同的情况判断使用那种shuffle。这里就不重点介绍,之后会详细介绍shuffle的细节实现。
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
- 执行上一个Stage的ShuffleMapTask
ShuffleRDD属于宽依赖,Spark会依据宽依赖将tasks划分为不同的阶段,只有上一个Stage执行完成才会涉及到下一个Stage的执行,所以这里的数据传递,必须涉及执行的过程。
Spark中的Task可以分为ResultTask和ShuffleMapTask两种,ShuffleMapTask一般是向下一级Stage传递数据,ResultTask是将数据的结果直接返回的Task。ShuffleRDD使用到的就是ShuffleMapTask。
ShuffleMapTask的功能就是切分RDD的元素,将其分配到多个桶中。这里的桶指的就是根据上面获取分区数组,分配的方法是采用RDD相应的分区器进行实现。
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime= System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime= if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
...
throw e
}
}
在ShuffleMapTask中,先对RDD和ShuffleDependency进行了序列化,然后通过SparkEnv获得ShuffleManage, 调用其write方法,进行Map端的写出。
在进行getWriter()时会根据在获取依赖时注册的shuffle方式获取其对应的Writer方法。
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
然后,再将上一个Stage的数据在Map端进行写出,写出的策略是按照选择的shuffle方式决定的。
- 获取优先位置信息
override protected def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
tracker.getPreferredLocationsForShuffle(dep, partition.index)
}
从master获取MapOutputTracker信息,获取shuffleDependency中的依赖信息。并将dep和分区id传入tracker获取优先位置信息。
def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
: Seq[String] = {
if (shuffleLocalityEnabled&& dep.rdd.partitions.length <SHUFFLE_PREF_MAP_THRESHOLD&&
dep.partitioner.numPartitions <SHUFFLE_PREF_REDUCE_THRESHOLD) {
val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
dep.partitioner.numPartitions,REDUCER_PREF_LOCS_FRACTION)
if (blockManagerIds.nonEmpty) {
blockManagerIds.get.map(_.host)
} else {
Nil
}
} else {
Nil
}
}
从中可以看出,只有开启本地reduce,同时map端的分区数小于默认1000,reduce端的分区数小于默认1000,才可以从tracker中获取优先位置。
- 执行compute
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
从其计算函数可以发现,ShuffleRDD会先将deps的head转换为ShuffleDependency。然后通过SparkEnv获取ShuffleManager的getReader从Map端进行读取写出的数据。getReader也会根据注册的Shuffle方式返回相应方式的reader策略。
最后调用read()方法进行读取。
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
// 创建 ShuffleBlockFetcher 迭代器,传入blockManager, mapSize, reduceSize
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
val serializerInstance =dep.serializer.newInstance()
// 创建key/value迭代器
// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
// 更新task metrics
// Update the context task metrics for each record read.
val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
val metricIter =CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
// 如果定义aggregator,同时开启mapSideCombine, 调用其combineCombinersByKey
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// We are reading values that are already combined
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// We don't know the value type, but also don't care -- the dependency *should*
// have made sure its compatible w/ this aggregator, which will convert the value
// type to the combined type C
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
// 按key分区排序,并spill到内存,磁盘
// Sort the output if there is a sort ordering defined.
val resultIter =dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer =dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}
resultIter match {
case _: InterruptibleIterator[Product2[K, C]] => resultIter
case _ =>
// Use another interruptible iterator here to support task cancellation as aggregator
// or(and) sorter may have consumed previous interruptible iterator.
new InterruptibleIterator[Product2[K, C]](context, resultIter)
}
}
- foldByKey 算子
foldByKey算子主要用于合并values的值,在合并前会为每一个value加上一个初值zeroValue。
val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x))
val rdd4 = rdd2.foldByKey(10)((a, b) => a + b)
println(rdd4.collect().mkString(","))
(1,11),(4,28),(5,15),(6,32)
可以看出其是先将值与zerovalue进行合并后,在调用传入的func进行合并。
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}
首先将zeroValue值使用序列化器转换为byte array, 这样可以方便给每一个key进行copy一份。创建一个从缓存反序列化获取zeroValue的函数。clean 传入的value合并函数。最后再调用combineByKeyWithClassTag,并将构造的函数传入。后面的内容就和combineByKey一致。foldByKey是开启map端合并
- reduceByKey 算子
reduceByKey是不带初值的values的合并,底层调用的同样是combineByKeyWithClassTag算子。
//val rdd4 = rdd2.reduceByKey((a, b) => a + b)
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
reduceByKey和foldByKey的唯一区别是是否带初值。
- groupByKey 算子
就是字面意思,对键值对RDD进行按Key分组,并将value加入维护的Seq中。并不会保证分组的顺序。采用的分区器为默认的HashPartitioner。
val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x))
val rdd4 = rdd2.groupByKey()
println(rdd4.collect().mkString(","))
(1,CompactBuffer(1)),(4,CompactBuffer(4, 4)),(5,CompactBuffer(5)),(6,CompactBuffer(6, 6))
可以看到聚合的values,被封装入了CompactBuffer类型中。
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) =>CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
从源码看出groupByKey其实是上面reduceByKey的缩减版,不用用户创建聚合的函数。
下面我们来看下CompactBuffer这个数据类型:
private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
// First two elements
private var element0: T = _
private var element1: T = _
// Number of elements, including our two in the main object
private varcurSize= 0
// Array for extra elements
private varotherElements: Array[T] = null
...
}
从CompactBuffer的实现上可以看出,CompactBuffer和ArrayBuffer的实现是基本一样的,不同的地方是CompactBuffer会一直维护element0,element1,其他放入对象数组,而ArrayBuffer是将所有元素都放入对象数组中。其次ArrayBuffer在创建时默认分配16元素空间。总之,CompactBuffer是ArrayBuffer的简化版,更节省内存空间,场景上是考虑了在groupby时经常会有很多的key,其values是很小的,并不需要创建很大的空间。
- groupBy 算子
groupBy 和 groupByKey的区别是,groupByKey是按照key进行分组,但是groupBy是根据用户传入的函数,将元素的值进行转换作为key, 按照应用函数后的值作为key进行分组,分组的结果为(k,v)都作为value。groupBy是RDD类的函数,它即可以作为RDD使用,也可以作为PairRDD使用。
val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x))
val rdd4 = rdd2.groupBy(c => c._1 % 5)
println(rdd4.collect().mkString(","))
(0,CompactBuffer((5,5))),(1,CompactBuffer((1,1), (6,6), (6,6))),(4,CompactBuffer((4,4), (4,4)))
上面的例子中,将key模5作为key进行分组,看下源码是如何实现的。
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
源码中可以看出,将(cleanF(t), t)将元素应用于函数作为key, 将整个元素作为value, 经过map转换为键值对类型,再调用groupByKey(p)。
综上,combineByKey、foldByKey、reduceByKey 和 groupByKey,它们都是对一个RDD的操作,同时它们底层调用的都是combineByKeyWithClassTag,他们仅仅是依次的简化版。
以下为多个RDD的操作算子:
CoGroupedRDD
- cogroup 算子
cogroup是将this和other的RDD中的数据进行分组合并,但和groupByKey不同的是,其不会将values合并到同一个迭代器中,仅仅是迭代器的合并。
var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
("a", 8), ("a", 9)), 2)
val rdd3 = rdd1.cogroup(rdd2)
println(rdd3.collect().mkString(","))
(a,(CompactBuffer(1),CompactBuffer(2, 5, 6, 8, 9))),(b,(CompactBuffer(5, 6),CompactBuffer())),(c,(CompactBuffer(6),CompactBuffer())),(d,(CompactBuffer(4),CompactBuffer()))
从源码可以看出,cogroup算子直接创建了一个CoGroupedRDD,在进行cogroup时,如果分区器为HashPartitioner, key不能为数组。
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { caseArray(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
- 获取RDD分区
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _, _] =>
None
case _ =>
Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
}
}.toArray)
}
array
}
创建CoGroupPartition ,每个RDD的检测其依赖,如果为ShuffleDependency返回为空,否则返回NarrowCoGroupSplitDep。
- 获得依赖
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
if (rdd.partitioner==Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part,serializer)
}
}
}
如果rdd的分区器是一致的则,使用 OneToOneDependency依赖,否则ShuffleDependency。override val partitioner: Some[Partitioner] = Some(part) ,而CoGroupRDD的分区器就是传入的分区器。遍历所有RDD的分区器,如果和传入的分区器一致则为OneToOne依赖,否则为ShuffleDependency依赖。
- 执行compute
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read them from the parent
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
rddIterators += ((it, depNum))
case shuffleDependency: ShuffleDependency[_, _, _] =>
// Read map outputs of shuffle
val it = SparkEnv.get.shuffleManager
.getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
.read()
rddIterators += ((it, depNum))
}
// 创建外部map
val map = createExternalMap(numRdds)
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
从源码可以看出,cogroup将依赖分区两种,分别进行封装到RDD的迭代器中,然后创建外部Map, 其中会创建一个ExternalAppendOnlyMap,它是Spark定义的一个优化内存使用的仅支持append的Map, 如果内存不足会将数据spill到磁盘。其中提供map合并的函数。最后遍历RDD的分区集合,将其进行合并返回。
下面我们来看下外部Map是如何实现的:
private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Array[CoGroup]
private def createExternalMap(numRdds: Int)
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
newCombiner(value._2) += value._1
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
combiner(value._2) += value._1
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
(combiner1, combiner2) => {
var depNum = 0
while (depNum < numRdds) {
combiner1(depNum) ++= combiner2(depNum)
depNum += 1
}
combiner1
}
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)
}
可以看出和Aggregate实现类似,只是给每一个RDD分配了CompactBuffer类型(优化的只append 的ArrayBuffer类型)。
ExternalAppendOnlyMap 是extends Spillable类,在value合并到组合器时会根据需要进行溢出。默认为32k。
- join 算子-内连接
join算子是将多个RDD按key进行聚合后,然后在进行flatMap展平,返回key匹配后value形成的(k,v)对。
var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
("a", 8), ("a", 9)), 2)
val rdd3 = rdd1.join(rdd2)
println(rdd3.collect().mkString(","))
(a,(1,2)),(a,(1,5)),(a,(1,6)),(a,(1,8)),(a,(1,9))
join的实现上,实际是调用了cogroup算子,然后将返回值调用flatMapValues算子。其次,从源码可以看出join算子,只允许两个RDD。
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
从源码使用yield将返回封装为集合,只有两个RDD都存在相同key才会返回。
- leftOuterJoin 算子-左外连接
和Join算子类似,调用cogroup算子,返回左RDD的所有,如果右为空则返回None。
def leftOuterJoin[W](
other: RDD[(K, W)],
partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => (v, None))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
- rightOuterJoin 算子-右外连接
右外连接和左外连接时类似的,只是将左为空的返回None。
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.iterator.map(w => (None, w))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}
SubtractedRDD
- subtractByKey 算子
返回RDD中数据,在this中,不在other中。SubtractedRDD在RDD转换算子提过,其实质是CoGroupRDD的优化版。
var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
("a", 8), ("a", 9)), 2)
val rdd3 = rdd1.subtractByKey(rdd2)
println(rdd3.collect().mkString(","))
(b,5),(b,6),(c,6),(d,4)
可以看到返回的数据是,仅仅在RDD1中的,所以可以直接将RDD1加入内存,RDD2使用Stream读进行匹配。
- 获取分区数组
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
// Each CoGroupPartition will depend on rdd1 and rdd2
array(i) = new CoGroupPartition(i,Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
case s: ShuffleDependency[_, _, _] =>
None
case _ =>
Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
}
}.toArray)
}
array
}
- 获取依赖
override def getDependencies: Seq[Dependency[_]] = {
def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
: Dependency[_] = {
if (rdd.partitioner== Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[T1, T2, Any](rdd, part)
}
}
Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
}
从代码可以看出,生成分区数组和获取依赖,完全和CoGroupRDD一模一样,连创建的分区也是一致的为CoGroupPartition。
- 执行compute
override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
val map = new JHashMap[K, ArrayBuffer[V]]
def getSeq(k: K): ArrayBuffer[V] = {
val seq = map.get(k)
if (seq != null) {
seq
} else {
val seq = new ArrayBuffer[V]()
map.put(k, seq)
seq
}
}
def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
dependencies(depNum) match {
case oneToOneDependency: OneToOneDependency[_] =>
val dependencyPartition = partition.narrowDeps(depNum).get.split
oneToOneDependency.rdd.iterator(dependencyPartition, context)
.asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
case shuffleDependency: ShuffleDependency[_, _, _] =>
val iter = SparkEnv.get.shuffleManager
.getReader(
shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
.read()
iter.foreach(op)
}
}
// 将RDD1加载map中
// the first dep is rdd1; add all values to the map
integrate(0, t => getSeq(t._1) += t._2)
// the second dep is rdd2; remove all of its keys
// 使用RDD2的值,从map中移除
integrate(1, t => map.remove(t._1))
map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
}
综上,SubtractedRDD是CoGroupRDD的优化版。
网友评论