Spark k-v类型转换算子

作者: Tim在路上 | 来源:发表于2022-02-08 14:05 被阅读0次

    Spark k-v类型转换算子

    MapPartitionsRDD

    • mapValues 算子

    将传入的函数应用于value的算子,实质是创建了MapPartitionsRDD,并在调用迭代函数时,只将函数应用于value。

    def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
      val cleanF = self.context.clean(f)
      new MapPartitionsRDD[(K, U), (K, V)](self,
        (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
        preservesPartitioning = true)
    }
    

    从上面源码可以看出返回的仍然是key-val类型,但仅仅将函数应用于v。(k, cleanF(v))

    • flatMapValues 算子

    将键值对的value进行压平,并再进行map映射为k-v。实质还是调用了MapPartitionsRDD。

    def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
      val cleanF = self.context.clean(f)
      new MapPartitionsRDD[(K, U), (K, V)](self,
        (context, pid, iter) => iter.flatMap { case (k, v) =>
          cleanF(v).map(x => (k, x))
        },
        preservesPartitioning = true)
    }
    

    可以发现在应用传入函数cleanF(v).map(x => (k, x))又对其进行散列。

    ShuffledRDD

    • partitionBy 算子

    partitionBy 算子的功能和reparition的功能差不多,都是返回指定分区个数的分区。partitionBy 是针对key-val RDD的,在传入参数中可以传入使用的分区器。

    def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
      if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
      if (self.partitioner== Some(partitioner)) {
        self
      } else {
        new ShuffledRDD[K, V, V](self, partitioner)
      }
    }
    

    分区中数组不能作为Hash分区器的key。其次判断传入的分区器和当前的分区器是否相等,相等则不会进行操作,直接返回。注意:分区器的相等,如果是HashPartitioner必须分区数也一致。

    override def equals(other: Any): Boolean = other match {
      case h: HashPartitioner =>
        h.numPartitions == numPartitions
      case _ =>
        false
    }
    

    所以如果分区数是不变的那么直接返回,否则一定会进行ShuffledRDD。只有k-v会发生shuffle,这也是方便k-v修改shuffle时的分区器。

    • combineByKey 算子

    combineByKey 算子按照key将value进行聚合,它是combineByKeyWithClassTag算子的简化版本,使用的是HashPartitioner分区器。

    def combineByKey[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null): RDD[(K, C)] = self.withScope {
      combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
        partitioner, mapSideCombine, serializer)(null)
    }
    

    需要传入三个函数:

    createCombiner:组合器函数,定义了v如何转换为c。

    mergeValue:合并值函数,定义了如何给定一个V将其与原来的C合并成新的C。

    mergeCombiners:合并组合器函数,定义了如何将相同key下的C给合并成一个C。

    举个例子:

    val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
    val rdd2 = rdd1.map(x => (x, x + x))
    val createCombiner = (v: Int) =>List(v)
    val mergeValue: (List[Int], Int) => List[Int] = (c: List[Int], v: Int) => v :: c
    val mergeCombiners = (c1: List[Int], c2: List[Int]) => c1 ::: c2
    val rdd4 = rdd2.combineByKey(createCombiner, mergeValue, mergeCombiners, 3)
    println(rdd4.collect().mkString(","))
    (6,List(12, 12)),(4,List(8, 8)),(1,List(2)),(5,List(10))
    

    源码是使用了aggregation 函数,将RDD[(k,v)]类型转换为RDD[(k,c)], 将v聚合为c。其次在传参中用户可以通过mapSideCombine参数,来设置是否开启map端的聚合。

    def combineByKeyWithClassTag[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    // 合并组合器函数必须定义
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      // key为数组,不能map端聚合
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        }
        // hash分区器,key不能为数组
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
        }
      }
     // 创建一个Aggregator
      val aggregator = new Aggregator[K, V, C](
        self.context.clean(createCombiner),
        self.context.clean(mergeValue),
        self.context.clean(mergeCombiners))
      if (self.partitioner==Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
        // 创建shuffleRDD
        new ShuffledRDD[K, V, C](self, partitioner)
          .setSerializer(serializer)
          .setAggregator(aggregator)
          .setMapSideCombine(mapSideCombine)
      }
    }
    

    shuffledRDD顾明思意,会使用分区器将原数据进行打乱,并重新分配到新的分区中。下面我们将详细介绍shuffledRDD的实现过程:

    1. 获取分区数组
    override def getPartitions: Array[Partition] = {
      Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
    }
    

    生成一个分区size大小的分区数组,依次为每一个分区创建一个ShuffledRDDPartition。ShuffledRDDPartition仅仅保存了当前的分区id。

    private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
      override valindex: Int = idx
    }
    
    1. 获取RDD的依赖,并向ShuffleManage注册shuffle
    override def getDependencies: Seq[Dependency[_]] = {
      val serializer =userSpecifiedSerializer.getOrElse {
        val serializerManager = SparkEnv.get.serializerManager
        if (mapSideCombine) {
          serializerManager.getSerializer(implicitly[ClassTag[K]],implicitly[ClassTag[C]])
        } else {
          serializerManager.getSerializer(implicitly[ClassTag[K]],implicitly[ClassTag[V]])
        }
      }
    List(new ShuffleDependency(prev, part, serializer,keyOrdering,aggregator,mapSideCombine))
    }
    

    获得序列化器,并创建ShuffleDependency。在ShuffleDependency中会校验如果开启mapSideCombine,则要求Aggregator函数必须指定。其次,会获取keyClass, valueClass, combinerClass。以及生成并向shuffleManager注册shuffleId。

    valshuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)
    

    在进行注册shuffle时会根据不同的情况判断使用那种shuffle。这里就不重点介绍,之后会详细介绍shuffle的细节实现。

    override def registerShuffle[K, V, C](
        shuffleId: Int,
        numMaps: Int,
        dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
      if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
        // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
        // need map-side aggregation, then write numPartitions files directly and just concatenate
        // them at the end. This avoids doing serialization and deserialization twice to merge
        // together the spilled files, which would happen with the normal code path. The downside is
        // having multiple files open at a time and thus more memory allocated to buffers.
        new BypassMergeSortShuffleHandle[K, V](
          shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
      } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
        // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
        new SerializedShuffleHandle[K, V](
          shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
      } else {
        // Otherwise, buffer map outputs in a deserialized form:
        new BaseShuffleHandle(shuffleId, numMaps, dependency)
      }
    }
    
    1. 执行上一个Stage的ShuffleMapTask

    ShuffleRDD属于宽依赖,Spark会依据宽依赖将tasks划分为不同的阶段,只有上一个Stage执行完成才会涉及到下一个Stage的执行,所以这里的数据传递,必须涉及执行的过程。

    Spark中的Task可以分为ResultTask和ShuffleMapTask两种,ShuffleMapTask一般是向下一级Stage传递数据,ResultTask是将数据的结果直接返回的Task。ShuffleRDD使用到的就是ShuffleMapTask。

    ShuffleMapTask的功能就是切分RDD的元素,将其分配到多个桶中。这里的桶指的就是根据上面获取分区数组,分配的方法是采用RDD相应的分区器进行实现。

    override def runTask(context: TaskContext): MapStatus = {
      // Deserialize the RDD using the broadcast variable.
      val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
      val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
      } else 0L
      val ser = SparkEnv.get.closureSerializer.newInstance()
      val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime= System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime= if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
      } else 0L
    
      var writer: ShuffleWriter[Any, Any] = null
      try {
        val manager = SparkEnv.get.shuffleManager
        writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        writer.stop(success = true).get
      } catch {
          ...
          throw e
      }
    }
    

    在ShuffleMapTask中,先对RDD和ShuffleDependency进行了序列化,然后通过SparkEnv获得ShuffleManage, 调用其write方法,进行Map端的写出。

    在进行getWriter()时会根据在获取依赖时注册的shuffle方式获取其对应的Writer方法。

    override def getWriter[K, V](
        handle: ShuffleHandle,
        mapId: Int,
        context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
        handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
      val env = SparkEnv.get
    handle match {
        case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
          new UnsafeShuffleWriter(
            env.blockManager,
    shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
            context.taskMemoryManager(),
            unsafeShuffleHandle,
            mapId,
            context,
            env.conf)
        case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
          new BypassMergeSortShuffleWriter(
            env.blockManager,
    shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
            bypassMergeSortHandle,
            mapId,
            context,
            env.conf)
        case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
          new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
      }
    }
    

    然后,再将上一个Stage的数据在Map端进行写出,写出的策略是按照选择的shuffle方式决定的。

    1. 获取优先位置信息
    override protected def getPreferredLocations(partition: Partition): Seq[String] = {
      val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
      val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
      tracker.getPreferredLocationsForShuffle(dep, partition.index)
    }
    

    从master获取MapOutputTracker信息,获取shuffleDependency中的依赖信息。并将dep和分区id传入tracker获取优先位置信息。

    def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
        : Seq[String] = {
      if (shuffleLocalityEnabled&& dep.rdd.partitions.length <SHUFFLE_PREF_MAP_THRESHOLD&&
          dep.partitioner.numPartitions <SHUFFLE_PREF_REDUCE_THRESHOLD) {
        val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
          dep.partitioner.numPartitions,REDUCER_PREF_LOCS_FRACTION)
        if (blockManagerIds.nonEmpty) {
          blockManagerIds.get.map(_.host)
        } else {
    Nil
    }
      } else {
    Nil
    }
    }
    

    从中可以看出,只有开启本地reduce,同时map端的分区数小于默认1000,reduce端的分区数小于默认1000,才可以从tracker中获取优先位置。

    1. 执行compute
    override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
      val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
      SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
        .read()
        .asInstanceOf[Iterator[(K, C)]]
    }
    

    从其计算函数可以发现,ShuffleRDD会先将deps的head转换为ShuffleDependency。然后通过SparkEnv获取ShuffleManager的getReader从Map端进行读取写出的数据。getReader也会根据注册的Shuffle方式返回相应方式的reader策略。

    最后调用read()方法进行读取。

    /** Read the combined key-values for this reduce task */
    override def read(): Iterator[Product2[K, C]] = {
    // 创建 ShuffleBlockFetcher 迭代器,传入blockManager, mapSize, reduceSize
      val wrappedStreams = new ShuffleBlockFetcherIterator(
        context,
        blockManager.shuffleClient,
        blockManager,
        mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
        serializerManager.wrapStream,
        // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
        SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
        SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
        SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
        SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
        SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
    
      val serializerInstance =dep.serializer.newInstance()
    // 创建key/value迭代器
      // Create a key/value iterator for each stream
      val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
        // Note: the asKeyValueIterator below wraps a key/value iterator inside of a
        // NextIterator. The NextIterator makes sure that close() is called on the
        // underlying InputStream when all records have been read.
        serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
      }
    // 更新task metrics
      // Update the context task metrics for each record read.
      val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
      val metricIter =CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
        recordIter.map { record =>
          readMetrics.incRecordsRead(1)
          record
        },
        context.taskMetrics().mergeShuffleReadMetrics())
    
      // An interruptible iterator must be used here in order to support task cancellation
      val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
    
     // 如果定义aggregator,同时开启mapSideCombine, 调用其combineCombinersByKey
      val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
        if (dep.mapSideCombine) {
          // We are reading values that are already combined
          val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
    dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
        } else {
          // We don't know the value type, but also don't care -- the dependency *should*
          // have made sure its compatible w/ this aggregator, which will convert the value
          // type to the combined type C
          val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
    dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
        }
      } else {
        interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
      }
    
    // 按key分区排序,并spill到内存,磁盘
      // Sort the output if there is a sort ordering defined.
      val resultIter =dep.keyOrdering match {
        case Some(keyOrd: Ordering[K]) =>
          // Create an ExternalSorter to sort the data.
          val sorter =
            new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer =dep.serializer)
          sorter.insertAll(aggregatedIter)
          context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
          context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
          context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
          // Use completion callback to stop sorter if task was finished/cancelled.
          context.addTaskCompletionListener[Unit](_ => {
            sorter.stop()
          })
    CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
        case None =>
          aggregatedIter
      }
    
      resultIter match {
        case _: InterruptibleIterator[Product2[K, C]] => resultIter
        case _ =>
          // Use another interruptible iterator here to support task cancellation as aggregator
          // or(and) sorter may have consumed previous interruptible iterator.
          new InterruptibleIterator[Product2[K, C]](context, resultIter)
      }
    }
    
    • foldByKey 算子

    foldByKey算子主要用于合并values的值,在合并前会为每一个value加上一个初值zeroValue。

    val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
    val rdd2 = rdd1.map(x => (x, x))
    val rdd4 = rdd2.foldByKey(10)((a, b) => a + b)
    println(rdd4.collect().mkString(","))
    (1,11),(4,28),(5,15),(6,32)
    

    可以看出其是先将值与zerovalue进行合并后,在调用传入的func进行合并。

    def foldByKey(
        zeroValue: V,
        partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
      // Serialize the zero value to a byte array so that we can get a new clone of it on each key
      val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
      val zeroArray = new Array[Byte](zeroBuffer.limit)
      zeroBuffer.get(zeroArray)
    
      // When deserializing, use a lazy val to create just one instance of the serializer per task
      lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
      val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
    
      val cleanedFunc = self.context.clean(func)
      combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
        cleanedFunc, cleanedFunc, partitioner)
    }
    
    

    首先将zeroValue值使用序列化器转换为byte array, 这样可以方便给每一个key进行copy一份。创建一个从缓存反序列化获取zeroValue的函数。clean 传入的value合并函数。最后再调用combineByKeyWithClassTag,并将构造的函数传入。后面的内容就和combineByKey一致。foldByKey是开启map端合并

    • reduceByKey 算子

    reduceByKey是不带初值的values的合并,底层调用的同样是combineByKeyWithClassTag算子。

    //val rdd4 = rdd2.reduceByKey((a, b) => a + b)
    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
      combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
    }
    

    reduceByKey和foldByKey的唯一区别是是否带初值。

    • groupByKey 算子

    就是字面意思,对键值对RDD进行按Key分组,并将value加入维护的Seq中。并不会保证分组的顺序。采用的分区器为默认的HashPartitioner。

    val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
    val rdd2 = rdd1.map(x => (x, x))
    val rdd4 = rdd2.groupByKey()
    println(rdd4.collect().mkString(","))
    (1,CompactBuffer(1)),(4,CompactBuffer(4, 4)),(5,CompactBuffer(5)),(6,CompactBuffer(6, 6))
    

    可以看到聚合的values,被封装入了CompactBuffer类型中。

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
      // groupByKey shouldn't use map side combine because map side combine does not
      // reduce the amount of data shuffled and requires all map side data be inserted
      // into a hash table, leading to more objects in the old gen.
      val createCombiner = (v: V) =>CompactBuffer(v)
      val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
      val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
      val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
        createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
      bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }
    

    从源码看出groupByKey其实是上面reduceByKey的缩减版,不用用户创建聚合的函数。

    下面我们来看下CompactBuffer这个数据类型:

    private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
      // First two elements
      private var element0: T = _
      private var element1: T = _
    
      // Number of elements, including our two in the main object
      private varcurSize= 0
    
      // Array for extra elements
      private varotherElements: Array[T] = null
    
      ...
    }
    

    从CompactBuffer的实现上可以看出,CompactBuffer和ArrayBuffer的实现是基本一样的,不同的地方是CompactBuffer会一直维护element0,element1,其他放入对象数组,而ArrayBuffer是将所有元素都放入对象数组中。其次ArrayBuffer在创建时默认分配16元素空间。总之,CompactBuffer是ArrayBuffer的简化版,更节省内存空间,场景上是考虑了在groupby时经常会有很多的key,其values是很小的,并不需要创建很大的空间。

    • groupBy 算子

    groupBy 和 groupByKey的区别是,groupByKey是按照key进行分组,但是groupBy是根据用户传入的函数,将元素的值进行转换作为key, 按照应用函数后的值作为key进行分组,分组的结果为(k,v)都作为value。groupBy是RDD类的函数,它即可以作为RDD使用,也可以作为PairRDD使用。

    val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
    val rdd2 = rdd1.map(x => (x, x))
    val rdd4 = rdd2.groupBy(c => c._1 % 5)
    println(rdd4.collect().mkString(","))
    (0,CompactBuffer((5,5))),(1,CompactBuffer((1,1), (6,6), (6,6))),(4,CompactBuffer((4,4), (4,4)))
    

    上面的例子中,将key模5作为key进行分组,看下源码是如何实现的。

    def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
        : RDD[(K, Iterable[T])] = withScope {
      val cleanF = sc.clean(f)
      this.map(t => (cleanF(t), t)).groupByKey(p)
    }
    

    源码中可以看出,将(cleanF(t), t)将元素应用于函数作为key, 将整个元素作为value, 经过map转换为键值对类型,再调用groupByKey(p)。

    综上,combineByKey、foldByKey、reduceByKey 和 groupByKey,它们都是对一个RDD的操作,同时它们底层调用的都是combineByKeyWithClassTag,他们仅仅是依次的简化版。


    以下为多个RDD的操作算子:

    CoGroupedRDD

    • cogroup 算子

    cogroup是将this和other的RDD中的数据进行分组合并,但和groupByKey不同的是,其不会将values合并到同一个迭代器中,仅仅是迭代器的合并。

    var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
    var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
      ("a", 8), ("a", 9)), 2)
    val rdd3 = rdd1.cogroup(rdd2)
    println(rdd3.collect().mkString(","))
    
    (a,(CompactBuffer(1),CompactBuffer(2, 5, 6, 8, 9))),(b,(CompactBuffer(5, 6),CompactBuffer())),(c,(CompactBuffer(6),CompactBuffer())),(d,(CompactBuffer(4),CompactBuffer()))
    

    从源码可以看出,cogroup算子直接创建了一个CoGroupedRDD,在进行cogroup时,如果分区器为HashPartitioner, key不能为数组。

    def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
        : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
      if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
      val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
      cg.mapValues { caseArray(vs, w1s) =>
        (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
      }
    }
    
    1. 获取RDD分区
    
    override def getPartitions: Array[Partition] = {
      val array = new Array[Partition](part.numPartitions)
      for (i <- 0 until array.length) {
        // Each CoGroupPartition will have a dependency per contributing RDD
        array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
          // Assume each RDD contributed a single dependency, and get it
          dependencies(j) match {
            case s: ShuffleDependency[_, _, _] =>
              None
            case _ =>
    Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
          }
        }.toArray)
      }
      array
    }
    

    创建CoGroupPartition ,每个RDD的检测其依赖,如果为ShuffleDependency返回为空,否则返回NarrowCoGroupSplitDep。

    1. 获得依赖
    override def getDependencies: Seq[Dependency[_]] = {
      rdds.map { rdd: RDD[_] =>
        if (rdd.partitioner==Some(part)) {
          logDebug("Adding one-to-one dependency with " + rdd)
          new OneToOneDependency(rdd)
        } else {
          logDebug("Adding shuffle dependency with " + rdd)
          new ShuffleDependency[K, Any, CoGroupCombiner](
            rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part,serializer)
        }
      }
    }
    
    

    如果rdd的分区器是一致的则,使用 OneToOneDependency依赖,否则ShuffleDependency。override val partitioner: Some[Partitioner] = Some(part) ,而CoGroupRDD的分区器就是传入的分区器。遍历所有RDD的分区器,如果和传入的分区器一致则为OneToOne依赖,否则为ShuffleDependency依赖。

    1. 执行compute
    override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
      val split = s.asInstanceOf[CoGroupPartition]
      val numRdds = dependencies.length
    
      // A list of (rdd iterator, dependency number) pairs
      val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
      for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
        case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
          val dependencyPartition = split.narrowDeps(depNum).get.split
          // Read them from the parent
          val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
          rddIterators += ((it, depNum))
    
        case shuffleDependency: ShuffleDependency[_, _, _] =>
          // Read map outputs of shuffle
          val it = SparkEnv.get.shuffleManager
            .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
            .read()
          rddIterators += ((it, depNum))
      }
      // 创建外部map
      val map = createExternalMap(numRdds)
      for ((it, depNum) <- rddIterators) {
        map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
      }
      context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
      context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
      context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
      new InterruptibleIterator(context,
        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
    }
    

    从源码可以看出,cogroup将依赖分区两种,分别进行封装到RDD的迭代器中,然后创建外部Map, 其中会创建一个ExternalAppendOnlyMap,它是Spark定义的一个优化内存使用的仅支持append的Map, 如果内存不足会将数据spill到磁盘。其中提供map合并的函数。最后遍历RDD的分区集合,将其进行合并返回。

    下面我们来看下外部Map是如何实现的:

    private type CoGroup = CompactBuffer[Any]
    private type CoGroupValue = (Any, Int)  // Int is dependency number
    private type CoGroupCombiner = Array[CoGroup]
    
    private def createExternalMap(numRdds: Int)
      : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
    
      val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
        val newCombiner = Array.fill(numRdds)(new CoGroup)
        newCombiner(value._2) += value._1
        newCombiner
      }
      val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
        (combiner, value) => {
        combiner(value._2) += value._1
        combiner
      }
      val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
        (combiner1, combiner2) => {
          var depNum = 0
          while (depNum < numRdds) {
            combiner1(depNum) ++= combiner2(depNum)
            depNum += 1
          }
          combiner1
        }
      new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
        createCombiner, mergeValue, mergeCombiners)
    }
    

    可以看出和Aggregate实现类似,只是给每一个RDD分配了CompactBuffer类型(优化的只append 的ArrayBuffer类型)。

    ExternalAppendOnlyMap 是extends Spillable类,在value合并到组合器时会根据需要进行溢出。默认为32k。

    • join 算子-内连接

    join算子是将多个RDD按key进行聚合后,然后在进行flatMap展平,返回key匹配后value形成的(k,v)对。

    var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
    var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
      ("a", 8), ("a", 9)), 2)
    val rdd3 = rdd1.join(rdd2)
    println(rdd3.collect().mkString(","))
    (a,(1,2)),(a,(1,5)),(a,(1,6)),(a,(1,8)),(a,(1,9))
    

    join的实现上,实际是调用了cogroup算子,然后将返回值调用flatMapValues算子。其次,从源码可以看出join算子,只允许两个RDD。

    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
      this.cogroup(other, partitioner).flatMapValues( pair =>
        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
      )
    }
    

    从源码使用yield将返回封装为集合,只有两个RDD都存在相同key才会返回。

    • leftOuterJoin 算子-左外连接

    和Join算子类似,调用cogroup算子,返回左RDD的所有,如果右为空则返回None。

    def leftOuterJoin[W](
        other: RDD[(K, W)],
        partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
      this.cogroup(other, partitioner).flatMapValues { pair =>
        if (pair._2.isEmpty) {
          pair._1.iterator.map(v => (v, None))
        } else {
          for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
        }
      }
    }
    
    • rightOuterJoin 算子-右外连接

    右外连接和左外连接时类似的,只是将左为空的返回None。

    def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
        : RDD[(K, (Option[V], W))] = self.withScope {
      this.cogroup(other, partitioner).flatMapValues { pair =>
        if (pair._1.isEmpty) {
          pair._2.iterator.map(w => (None, w))
        } else {
          for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
        }
      }
    }
    

    SubtractedRDD

    • subtractByKey 算子

    返回RDD中数据,在this中,不在other中。SubtractedRDD在RDD转换算子提过,其实质是CoGroupRDD的优化版。

    var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
    var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
      ("a", 8), ("a", 9)), 2)
    val rdd3 = rdd1.subtractByKey(rdd2)
    println(rdd3.collect().mkString(","))
    (b,5),(b,6),(c,6),(d,4)
    

    可以看到返回的数据是,仅仅在RDD1中的,所以可以直接将RDD1加入内存,RDD2使用Stream读进行匹配。

    1. 获取分区数组
    override def getPartitions: Array[Partition] = {
      val array = new Array[Partition](part.numPartitions)
      for (i <- 0 until array.length) {
        // Each CoGroupPartition will depend on rdd1 and rdd2
        array(i) = new CoGroupPartition(i,Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
          dependencies(j) match {
            case s: ShuffleDependency[_, _, _] =>
              None
            case _ =>
              Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
          }
        }.toArray)
      }
      array
    }
    
    1. 获取依赖
    override def getDependencies: Seq[Dependency[_]] = {
      def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
        : Dependency[_] = {
        if (rdd.partitioner== Some(part)) {
          logDebug("Adding one-to-one dependency with " + rdd)
          new OneToOneDependency(rdd)
        } else {
          logDebug("Adding shuffle dependency with " + rdd)
          new ShuffleDependency[T1, T2, Any](rdd, part)
        }
      }
    Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
    }
    

    从代码可以看出,生成分区数组和获取依赖,完全和CoGroupRDD一模一样,连创建的分区也是一致的为CoGroupPartition。

    1. 执行compute
    override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
        val partition = p.asInstanceOf[CoGroupPartition]
        val map = new JHashMap[K, ArrayBuffer[V]]
        def getSeq(k: K): ArrayBuffer[V] = {
          val seq = map.get(k)
          if (seq != null) {
            seq
          } else {
            val seq = new ArrayBuffer[V]()
            map.put(k, seq)
            seq
          }
        }
        def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
          dependencies(depNum) match {
            case oneToOneDependency: OneToOneDependency[_] =>
              val dependencyPartition = partition.narrowDeps(depNum).get.split
              oneToOneDependency.rdd.iterator(dependencyPartition, context)
                .asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
    
            case shuffleDependency: ShuffleDependency[_, _, _] =>
              val iter = SparkEnv.get.shuffleManager
                .getReader(
                  shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
                .read()
              iter.foreach(op)
          }
        }
        // 将RDD1加载map中
        // the first dep is rdd1; add all values to the map
        integrate(0, t => getSeq(t._1) += t._2)
        // the second dep is rdd2; remove all of its keys
        // 使用RDD2的值,从map中移除
        integrate(1, t => map.remove(t._1))
        map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
      }
    

    综上,SubtractedRDD是CoGroupRDD的优化版。

    相关文章

      网友评论

        本文标题:Spark k-v类型转换算子

        本文链接:https://www.haomeiwen.com/subject/dwubkrtx.html