美文网首页
MapReducer之Combiner(归约处理)

MapReducer之Combiner(归约处理)

作者: 末央酒 | 来源:发表于2018-04-05 22:32 被阅读0次

    Commbiner相当于本地的Reducer计算模式,但是并不是所有场合都适合,总结一下都是什么场合适合用。


    作用

    因为Map产生了太多的输出,为了减少RPC传输,在本地进行一次类似于Reduce操作,进行累加,再将累加的值传给Reduce。

    注意:因为Combiner是可插拔的,所以添加Combiner不能影响最终的计算机过,Combiner应该适用于那些,Reduce输入和输出key/value类型完全一致的,且不影响最终结果的。

    WordCount实例

    public class TestCombinerForAvgMR {

    //Map对不同文件不同月份进行统计

        public static class ForMapextends Mapper {

    Textokey =new Text();

    AvgEntityavgEntity =new AvgEntity();

    @Override

            protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

    String []line = value.toString().split(" ");

    okey.set(line[0]);

    avgEntity.setCount(1);

    avgEntity.setSum(Integer.parseInt(line[1]));

    context.write(okey,avgEntity);

    }

    }

    //Combiner对每个月份的进行累加

        public static class ForCombinextends Reducer {

    @Override

            protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {

    int sum =0;

    int count =0;

    for (AvgEntity avgEntity:values){

    sum += avgEntity.getSum();

    count += avgEntity.getCount();

    }

    AvgEntity avgEntity =new AvgEntity();

    avgEntity.setSum(sum);

    avgEntity.setCount(count);

    context.write(key,avgEntity);

    }

    }

    //将月份合并进行累加,做除法

        public static class ForReduceextends Reducer{

    @Override

            protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {

    int sum =0;

    int count =0;

    for(AvgEntity avgEntity : values){

    sum += avgEntity.getCount();

    count += avgEntity.getSum();

    }

    context.write(key,new IntWritable(sum/count));

    }

    }

    public static void main(String[] args)throws IOException, ClassNotFoundException, InterruptedException {

    Job job = Job.getInstance(new Configuration());

    job.setMapperClass(ForMap.class);

    job.setReducerClass(ForReduce.class);

    job.setCombinerClass(ForCombin.class);

    job.setMapOutputKeyClass(Text.class);

    job.setMapOutputValueClass(AvgEntity.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.setInputPaths(job,new Path("目录"));

    FileOutputFormat.setOutputPath(job,new Path("目录"));

    job.waitForCompletion(true);

        }

    问题总结

    1.为什么需要在Mapper端进行归约处理

        因为在Mapper进行归约后,数据量变小了,这样再通过网络传输时,传输时间就变短了,减少了整个作业的运行时间。

    2.为什么可以在Mapper端进行归约处理

        因为Reducer端接收的数据就是来自于Mapper端。我们在Mapper进行归约处理,无非就是把归约操作提前到Mapper端做而已。

    3.既然在Mapper端进行了归约处理,为什么还要在Reducer端进行处理。

        因为Mapper端只处理了本节点的数据,而Reduce端处理的是来自多个Mapper端的数据,因此有些在Mapper端不能归约的数据,在Reducer端可以进行归约。

    4.求平均数(SVG)的非关联操作场景如何减少I/O传输量

        更改Mapper端使其输出两列数据分别是数值个数count和平均数avg,这样在Reducer端累加count作为总的数值个数,输出计数和平均值。

    相关文章

      网友评论

          本文标题:MapReducer之Combiner(归约处理)

          本文链接:https://www.haomeiwen.com/subject/qpvdhftx.html