combineByKey
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
其中的参数:
createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
numPartitions:结果RDD分区数,默认保持原有的分区数
partitioner:分区函数,默认为HashPartitioner
mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true
举例理解:
假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:
1 定义我们需要什么样的果汁。
--相当于Hadoop中的map
2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁。
** --相当于hadoop中的local combiner**
3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。
--相当于全局进行combiner,对每个partition 进行操作
那么对比上述三步,combineByKey的三个函数也就是这三个功能
1 createCombiner就是定义了v如何转换为c
2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C
3 就是定义了如何将相同key下的C给合并成一个C
补充
注意体会 与 aggregate 的区别 foldByKey (f:(v,v) => v)
(zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
def testFoldBYKey(): Unit ={
val rdd2 =sc.makeRDD(Array(("A",1),("A",2),("c",1),("b",1),("b",2)))
rdd2.foldByKey(2)(_+_).collect().foreach(println)
}
网友评论