美文网首页
Spark groupByKey和reduceByKey

Spark groupByKey和reduceByKey

作者: 喵星人ZC | 来源:发表于2019-05-13 20:13 被阅读0次

    一、从shuffle方面看两者性能
    groupByKey和reduceByKey都是ByKey系列算子,都会产生shuffle。我们通过简单的WC看看两者的区别

    • groupByKey实现WC
    scala> val rdd = sc.parallelize(List(1,1,2,2,3,3)).map((_,1))
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:24
    
    scala> rdd.groupByKey().map(x => (x._1,x._2.sum)).collect.foreach(println)
    (2,2)                                                                           
    (1,2)
    (3,2)
    

    查看WebUI


    groupByKey.png

    Shuffle Read/Shuffle Write 等于192B

    • reduceByKey实现WC
    scala> val rdd = sc.parallelize(List(1,1,2,2,3,3)).map((_,1))
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:24
    
    scala> rdd.reduceByKey(_+_).collect.foreach(println)
    (2,2)
    (1,2)
    (3,2)
    

    查看WebUI


    reduceByKey.png

    Shuffle Read/Shuffle Write 等于184B

    以此来看reduceByKey的性能比groupByKey好,因为发生shuffle的数据小一些,减少了数据拉去次数和网络IO、磁盘IO。

    二、通过源码追踪为何reduceByKey更加适合在生产中使用

    groupByKey源码

      def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
        groupByKey(defaultPartitioner(self))
      }
    

    groupByKey调用的是groupByKey,我们继续点进去

      def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
        // groupByKey shouldn't use map side combine because map side combine does not
        // reduce the amount of data shuffled and requires all map side data be inserted
        // into a hash table, leading to more objects in the old gen.
        val createCombiner = (v: V) => CompactBuffer(v)
        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
        val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
        bufs.asInstanceOf[RDD[(K, Iterable[V])]]
      }
    

    reduceByKey源码

      def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        reduceByKey(defaultPartitioner(self), func)
      }
    

    reduceByKey调用的是reduceByKey,我们继续点进去,reduceByKey调用的是combineByKeyWithClassTag

     def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
      }
    

    继续点进去

    @Experimental
      def combineByKeyWithClassTag[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
        if (keyClass.isArray) {
          if (mapSideCombine) {
            throw new SparkException("Cannot use map-side combining with array keys.")
          }
          if (partitioner.isInstanceOf[HashPartitioner]) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
        }
        val aggregator = new Aggregator[K, V, C](
          self.context.clean(createCombiner),
          self.context.clean(mergeValue),
          self.context.clean(mergeCombiners))
        if (self.partitioner == Some(partitioner)) {
          self.mapPartitions(iter => {
            val context = TaskContext.get()
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
          }, preservesPartitioning = true)
        } else {
          new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
        }
      }
    

    通过传入的参数我们可以发现两者最大的不同是mapSideCombine参数的不同。mapSideCombine参数是否进行map端的本地聚合,groupByKey的mapSideCombine默认值为false,表示不进行map的本地聚合,reduceByKey的mapSideCombine默认值为true,表示进行map的本地聚合。

    我们通过MapReduce的shuffle过程可以知道shuffle发生在reduce task 拉去 map task处理的结果数据的过程间,所以在map端进行一次数据的本地聚合能够优化shuffle。具体请看以下图解过程

    groupByKey.png reduceByKey.png

    相关文章

      网友评论

          本文标题:Spark groupByKey和reduceByKey

          本文链接:https://www.haomeiwen.com/subject/kjaraqtx.html