美文网首页
spark如何只在map端做完成reduce的工作

spark如何只在map端做完成reduce的工作

作者: aaron1993 | 来源:发表于2017-11-19 16:30 被阅读0次

    1. 前言

    有时候需要按照key去做reduce操作时,一般情况下调用reduceByKey就可以完成按照key reduce的任务,reduceByKey的调用就必然意味着shuffle操作。但是有的时候如果我们已经知道相同的key都在同一个partition里面了,这个时候其实没有必要去使用reduceByKey通过一次shuffle将相同的key收集到同一个reducer分区里面,而是可以直接在map端就去完成reduce操作。

    比如下面是一个word count在2个分区里面的分布:

    ------partition 1----------
    (failure,1)
    (count,1)
    (thief,1)
    (failure,1)
    
    ------partition 2--------
    (fortification,1)
    (peek,1) 
    (lepta,1)
    (peek,1)
    

    由于相同的word都在同一个分区里面了,没必要去通过reduceByKey去完成word count操作。

    2. 解决方法

    实现一个RDD,在其compute方法里完成按key聚合,实现如下:

    /**
     K: key type
     V: 上游rdd中value的type
     C: V 经过reduce之后的type
    参考ShufferedRDD
    */
    class MapsideReduceRDD[K:ClassTag, V:ClassTag, C:ClassTag](
       // 上游rdd,要求上游的rdd中数据已经转换成(key,value)的形式
      @transient var prev : RDD[_ <: Product2[K,V]]
    
    ) extends RDD[(K,C)](prev){
      // 需要一个aggregator去完成value的聚合, reduceByKey也会创建这个
      private var aggregator : Option[Aggregator[K,V,C]] = None
    
      def setAggregator(aggregator: Aggregator[K,V,C]):this.type ={
        this.aggregator = Option(aggregator)
        this
      }
    
      override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
        /* 创建一个ExternalAppendOnlyMap,这个数据结构是spark中提供的,插入
     .  (K,V)的数据,然后按照传递给他聚合方法完成(K,V)的聚合,返回(K,C)的数据
       */
        val externalMap = createExternalMap
      // 这里迭代上游rdd中(K,V)类型的记录
        val rddIter = dependencies(0).rdd.asInstanceOf[RDD[Product2[K,V]]].iterator(split, context)
        // 插入到externalMap中
        externalMap.insertAll(rddIter)
       // 返回
        new InterruptibleIterator(context,
          externalMap.iterator
        )
      }
    
      override protected def getPartitions: Array[Partition] = firstParent[Product2[K,V]].partitions
    
      private def createExternalMap: ExternalAppendOnlyMap[K,V,C] = {
        require(aggregator.nonEmpty, "aggregator should not be empty")
        /**
          创建ExternalAppendOnlyMap, 它需要一下参数:
          - 一个V => C 类型的函数,用于迭代时发现某个key的第一个value,将它转换成C
         - 一个(C,V) => C类型的函数,用于将value合并到C上
         - 一个(C,C) => C类型的函数,将两个部分聚合的结果合并到一起
         */
        new ExternalAppendOnlyMap[K,V,C](aggregator.get.createCombiner, aggregator.get.mergeValue, aggregator.get.mergeCombiners)
      }
    }
    

    ExternalAppendOnlyMap会在必要时spill到磁盘

    2.1 测试

    测试类如下:

    
    object MapsideReduceTest {
      def main(args: Array[String]): Unit ={
        val sc = new SparkContext()
        val words = Seq(Seq("failure","count","thief","failure","count"),Seq("fortification","peek","lepta","peek"));
        // 分两个分区,第一个分区包含Seq("failure","count","thief","failure","count")
        // 这样相同的word只在一个分区里面,然后统计word count
        val wordsRDD = sc.parallelize(words, 2)
        // flatMap将Seq()展开,然后调用map转换成(failure,1)这种数据
        val wordsCount = wordsRDD.flatMap(seq => seq).map(word => (word,1))
    
        val aggregator = createAggregator
        val mapsideReduceRDD = new MapsideReduceRDD[String, Int, Int](wordsCount).setAggregator(aggregator)
        mapsideReduceRDD.saveAsTextFile("/Users/eric/mapsideReduce")
      }
    
      def createAggregator: Aggregator[String,Int,Int] ={
        val createCombiner: Int => Int = value => value
        val mergeValue : (Int, Int) => Int = (mergedValue, newValue) => {
          mergedValue + newValue
        }
        val mergeCombiner = mergeValue
    
        new Aggregator[String, Int,Int](createCombiner, mergeValue, mergeCombiner)
      }
    }
    

    提交后测试结果如下:

    产生2个输出文件以及内容:
    ------ part-00000 -------
    (failure,2)
    (count,2)
    (thief,1)
    
    ------- part-00001 ------
    (lepta,1)
    (peek,2)
    (fortification,1)
    

    相关文章

      网友评论

          本文标题:spark如何只在map端做完成reduce的工作

          本文链接:https://www.haomeiwen.com/subject/jxyomxtx.html