Spark combineBykey 生动解释

作者: Codlife | 来源:发表于2016-08-24 10:45 被阅读0次

    combineByKey

    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
    其中的参数:
    createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
    mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
    mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
    numPartitions:结果RDD分区数,默认保持原有的分区数
    partitioner:分区函数,默认为HashPartitioner
    mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

    举例理解:
    假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:
    1 定义我们需要什么样的果汁。
    --相当于Hadoop中的map
    2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁。
    ** --相当于hadoop中的local combiner**
    3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。
    --相当于全局进行combiner,对每个partition 进行操作

    那么对比上述三步,combineByKey的三个函数也就是这三个功能
    1 createCombiner就是定义了v如何转换为c
    2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C
    3 就是定义了如何将相同key下的C给合并成一个C

    Paste_Image.png

    补充

    注意体会 与 aggregate 的区别 foldByKey (f:(v,v) => v)
    (zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)]

     def testFoldBYKey(): Unit ={
        val rdd2 =sc.makeRDD(Array(("A",1),("A",2),("c",1),("b",1),("b",2)))
        rdd2.foldByKey(2)(_+_).collect().foreach(println)
      
      }
    

    http://www.cnblogs.com/rigid/p/5563205.html

    相关文章

      网友评论

        本文标题:Spark combineBykey 生动解释

        本文链接:https://www.haomeiwen.com/subject/hhansttx.html