map
首先我们看看map的源码
/**
* Return a new RDD by applying a function to all elements of this RDD.
* 返回一个新的RDD,将参数中的function应用到这个RDD中每一个元素上。
* ClassTag只包含实际运行时的类的类型 map属于transfomation
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
1.withScope的作用是:
首先是根据堆栈信息(Thread.currentThread.getStackTrace)找出调用者的名字,比如 map, textFile, reduceByKey 等等,然后在 SparkContext 的属性 "spark.rdd.scope" 中新建了一个属性 RDDOperationScope(name: String, parent: RDDOperationScope),用来记录当前的运行 RDD 信息。其中 parent 可以用来追溯到所有 RDD 操作信息,即 RDDOperationScope。
2.clean函数的用于:
检查闭包函数是否序列化,移除无用的变量,将序列化函数发送至task上
最终调用
/** Creates a new iterator that maps all produced values of this iterator
* to new values using a transformation function.
* 回调函数map的next,将每一个元素进行计算处理,最后返回一个新的
* RDD,新的RDD的分区数 保持不变。
* @param f the transformation function
* @return a new iterator which transforms every value produced by this
* iterator by applying the function `f` to it.
* @note Reuse: $consumesAndProducesIterator
*/
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}
根据范型不同,会有多种Abstractlterator子类。
join
将两个RDD中共同的key matching,返回一个turple(k,(v1,v2))。
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
* 跨集群执行散列连接。
*/
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}
默认HashPartitioner:
An object that defines how the elements in a key-value pair RDD are partitioned by key.Maps each key to a partition ID, from 0 to numPartitions - 1
.如何将key分区,并且每个分区都有标号。
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
关键代码如下 :
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
我们先来看cogroup
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
* 返回一个RDD包含所有同时在两个RDD中的key并且以tuple的形式。
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
我们来介绍一下CoGroupedRDD
官方解释:
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
*/
由于产生了新RDD并且有mapValue操作,所以需要看一下CoGroupedRDD的实现。
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read them from the parent
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
rddIterators += ((it, depNum))
case shuffleDependency: ShuffleDependency[_, _, _] =>
// Read map outputs of shuffle
val it = SparkEnv.get.shuffleManager
.getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
.read()
rddIterators += ((it, depNum))
}
val map = createExternalMap(numRdds)
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
context.internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
join是shuffle,所以需要shuffleManager来读取上层Stage的运算结果。
private def createExternalMap(numRdds: Int)
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
newCombiner(value._2) += value._1
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
combiner(value._2) += value._1
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
(combiner1, combiner2) => {
var depNum = 0
while (depNum < numRdds) {
combiner1(depNum) ++= combiner2(depNum)
depNum += 1
}
combiner1
}
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)
}
创建ExternalAppendOnlyMap,对应有combine,merge过程。
网友评论