《5分钟掌握大数据:MapReduce》中介绍过,Combine的作用是对单个Map的输出进行本地的部分聚合之后再将结果传递给Reduce,以减少网络中的IO开销和Reduce的压力,所以其实际上就是一个局部的Reduce。
一般来说,使用Combine进行部分聚合也会有一定量的额外开销,具体情况可能要具体分析;不过大多数情况下,都会提升任务的性能。
Combine的作用
上文也说过,Hadoop中的Combine函数,本质上就是一个本地的Reducer,其将Reduce需要的数据进行本地处理,来提高Hadoop的运行性能。
Combine的使用限制
在使用Combine时,要注意以下两个限制:
1 输入和输出格式一致。因为Combine的输入是Map的输出,Combine的输出是Reduce的输入, 而Map的输出和Reduce的输入是一致的;所以,我们需要确保Combine的输入和输出一致。
2 本地的Reduce不能改变最终的结果。比如WordCount,在本地做累加对最终的结果是没有影响,可以使用Combine;但是计算平均数就不可以使用,因为会丢失数量信息。
Combine的简单使用场景
考虑到以上两个限制:输入与输出格式一致、具体在使用的时候,有以下几种使用场景:
1 Combine中进行数据去重。如果我们使用的场景中,需要用到distinct这种唯一性操作,那么我们可以在同一个Map里进行部分去重(因为无法跨Map进行Combine去重),来减少传输量和Reduce的工作量。
2 Combine中进行局部计算。如果需要求和、求最大值、最小值这种局部操作不会改变最后结果的计算,也可以使用Combine来进行优化。
举例来说:
第一种情况:找出一个字符串中出现的字母
原文:aabbbc
map:["a", "a", "b"] ["b", "b", "c"]
Combine:["a", "b"] ["b", "c"] -- 对每个map的结果进行本地去重
Reduce:["a"] ["b"] ["c"]
最后结果:"a", "b", "c"
第二种情况:统计一个字符串中字母出现的次数
原文:aabbbc
map:["a:1", "a:1", "b:1"] ["b:1", "b:1", "c:1"]
Combine:["a:2", "b:1"] ["b:2", "c:1"] -- 对每个map的结果进行本地求和
Reduce:["a:2"] ["b:3"] ["c:1"]
最后结果:"a:2", "b:3", "c:1"
java中Combine的实现
上面已经说过了,Combine是一个局部的Reduce,实现了最终的Reduce的部分功能,比如本地去重、本地计数等;所以Combine实质上就是一个Reduce。
在Java中也是这样:Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,它们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。
首先,我们来看一个Combine的最小实现:其功能十分简单,输入即输出。
Java中Combine的最小实现接下来我们还是来看WordCount中的Combine实现:
Java中WordCount的Combine实现可以看出,这里的Combine的写法和Reduce如出一辙,除了类名不同之外,其他全都一样。
说明:
1 即使功能一样,两者发生作用的阶段也不一样。Combine作用在Mapper的本地节点,Reducer作用在集群上。
2 对于小文件,这样使用Combine可能会引入额外的开销,但是对于大文件来说,还是会有很大的性能提升的。
文集链接
文章链接
网友评论