美文网首页
hadoop(10)Shuffle之Combiners

hadoop(10)Shuffle之Combiners

作者: Tomy_Jx_Li | 来源:发表于2018-10-20 18:22 被阅读1次

    1 什么是Combiners

    每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次处理,以减少传输到reducer的数据量。
    Combiner比较常用的实现就是本地key的归并(这种Combiner一般和Reducer的功能是一模一样的,也就是不用在新编写Combiner的实现类了,直接使用Reducer的实现即可。也就是通常说的可插拔Combiner,因为这样的Combiner有没有最终结果都是不变的),Combiner具有类似本地的reduce功能。combiner就是一个map端的reducer。
    如果不用Combiner,那么,所有的结果都是reduce完成,效率会相对低下(在数据量比较大的时候才能体系出来,否则反而会起到反作用)。使用Combiner,先完成的map会在本地聚合,提升速度。
    也可以使用Combiner进行数据的过滤或者是其他的操作。这种的一般Combiner中的实现逻辑和Reducer不一样。即非插拔Combiner。
    注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

    2 流程图

    combiner和无combiner的流程比较

    3 代码实现

    package com.jiyx.test.mapred.flowStatistics;
    
    import com.jiyx.test.mapred.flowStatistics.bo.DataBean;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * Job
     * @author jiyx
     * @create 2018-10-15-19:21
     */
    public class FlowStatistics {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance();
    
            job.setJarByClass(FlowStatistics.class);
    
            job.setMapperClass(FlowStatisticsMapper.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            job.setReducerClass(FlowStatisticsReducer.class);
            // 这块需要注意的是自己踩了一个坑,就是将key和value整反了
            // 然后就会出现异常java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.jiyx.test.mapred.flowStatistics.bo.DataBean
            // 所以这里最好注意下
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(DataBean.class);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // partitioner
            job.setPartitionerClass(FlowStatisticsPartitioner.class);
            // 这个指定的数值必须大于等于分区数,1除外。指定1的话就是默认的伪集群分区。
            // 除了1以外,不能比FlowStatisticsPartitioner中划分的分区数量少
            // 如本例中分了4个分区(从0到3共4个),那么这里指定为2的话,只有前两个分区的数据知道去哪个reduce处理数据
            // 后两个不知道去哪个reduce处理,会报错,但是这里测试了下,有中情况是不会报错的
            // 就是当我的数据只有前两个的分区数据时,也就是虽然我分了4个分区,
            // 但是实际只会产生两个分区的数据,就不会报错,不知道算不算bug
            job.setNumReduceTasks(Integer.parseInt(args[2]));
    
            // combiner
            job.setCombinerClass(FlowStatisticsReducer.class);
    
            job.waitForCompletion(true);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:hadoop(10)Shuffle之Combiners

          本文链接:https://www.haomeiwen.com/subject/ibmmzftx.html