1 什么是Combiners
每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次处理,以减少传输到reducer的数据量。
Combiner比较常用的实现就是本地key的归并(这种Combiner一般和Reducer的功能是一模一样的,也就是不用在新编写Combiner的实现类了,直接使用Reducer的实现即可。也就是通常说的可插拔Combiner,因为这样的Combiner有没有最终结果都是不变的),Combiner具有类似本地的reduce功能。combiner就是一个map端的reducer。
如果不用Combiner,那么,所有的结果都是reduce完成,效率会相对低下(在数据量比较大的时候才能体系出来,否则反而会起到反作用)。使用Combiner,先完成的map会在本地聚合,提升速度。
也可以使用Combiner进行数据的过滤或者是其他的操作。这种的一般Combiner中的实现逻辑和Reducer不一样。即非插拔Combiner。
注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
2 流程图
combiner和无combiner的流程比较3 代码实现
package com.jiyx.test.mapred.flowStatistics;
import com.jiyx.test.mapred.flowStatistics.bo.DataBean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* Job
* @author jiyx
* @create 2018-10-15-19:21
*/
public class FlowStatistics {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJarByClass(FlowStatistics.class);
job.setMapperClass(FlowStatisticsMapper.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(FlowStatisticsReducer.class);
// 这块需要注意的是自己踩了一个坑,就是将key和value整反了
// 然后就会出现异常java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.jiyx.test.mapred.flowStatistics.bo.DataBean
// 所以这里最好注意下
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// partitioner
job.setPartitionerClass(FlowStatisticsPartitioner.class);
// 这个指定的数值必须大于等于分区数,1除外。指定1的话就是默认的伪集群分区。
// 除了1以外,不能比FlowStatisticsPartitioner中划分的分区数量少
// 如本例中分了4个分区(从0到3共4个),那么这里指定为2的话,只有前两个分区的数据知道去哪个reduce处理数据
// 后两个不知道去哪个reduce处理,会报错,但是这里测试了下,有中情况是不会报错的
// 就是当我的数据只有前两个的分区数据时,也就是虽然我分了4个分区,
// 但是实际只会产生两个分区的数据,就不会报错,不知道算不算bug
job.setNumReduceTasks(Integer.parseInt(args[2]));
// combiner
job.setCombinerClass(FlowStatisticsReducer.class);
job.waitForCompletion(true);
}
}
网友评论