* Return a new RDD by applying a function to all elements of this RDD.
* 返回一个新的RDD,将参数中的function应用到这个RDD中每一个元素上。
* ClassTag只包含实际运行时的类的类型 map属于transfomation
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) =>
首先是根据堆栈信息(Thread.currentThread.getStackTrace)找出调用者的名字,比如 map, textFile, reduceByKey 等等,然后在 SparkContext 的属性 "spark.rdd.scope" 中新建了一个属性 RDDOperationScope(name: String, parent: RDDOperationScope),用来记录当前的运行 RDD 信息。其中 parent 可以用来追溯到所有 RDD 操作信息,即 RDDOperationScope。
/** Creates a new iterator that maps all produced values of this iterator
* to new values using a transformation function.
* 回调函数map的next,将每一个元素进行计算处理,最后返回一个新的
* RDD,新的RDD的分区数 保持不变。
* @param f the transformation function
* @return a new iterator which transforms every value produced by this
* iterator by applying the function `f` to it.
* @note Reuse: $consumesAndProducesIterator
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(
将两个RDD中共同的key matching,返回一个turple(k,(v1,v2))。
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
* 跨集群执行散列连接。
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
An object that defines how the elements in a key-value pair RDD are partitioned by key.Maps each key to a partition ID, from 0 to numPartitions - 1
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
* 返回一个RDD包含所有同时在两个RDD中的key并且以tuple的形式。
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read them from the parent
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
rddIterators += ((it, depNum))
case shuffleDependency: ShuffleDependency[_, _, _] =>
// Read map outputs of shuffle
val it = SparkEnv.get.shuffleManager
.getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
rddIterators += ((it, depNum))
val map = createExternalMap(numRdds)
for ((it, depNum) <- rddIterators) {
map.insertAll( => (pair._1, new CoGroupValue(pair._2, depNum))))
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
private def createExternalMap(numRdds: Int)
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
newCombiner(value._2) += value._1
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
combiner(value._2) += value._1
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
(combiner1, combiner2) => {
var depNum = 0
while (depNum < numRdds) {
combiner1(depNum) ++= combiner2(depNum)
depNum += 1
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)