美文网首页我爱编程
MapReduce 分组 TopN(二次排序)

MapReduce 分组 TopN(二次排序)

作者: 博弈史密斯 | 来源:发表于2018-06-21 22:15 被阅读0次

    在Hadoop中,排序是MapReduce的灵魂,MapTask和ReduceTask均会对数据按Key排序,这个操作是MR框架的默认行为,不管你的业务逻辑上是否需要这一操作。

    这篇主要讲 如何实现:当需要根据某一字段进行分组,并对每个分组求 排在前 N 个的值。而上篇没有涉及到分组,这是两篇最大的不同。

    需求

    如下面的数据:

    grade   score  
    A       10  
    A       40  
    B       30  
    C       20  
    B       10  
    D       40  
    A       30  
    C       20  
    B       10  
    D       40  
    C       30  
    D       20  
    ...
    

    grade 是年级,主要包括 A、B、C、D 四个年级;
    score 是对应年级的分数。

    我们需要根据年级进行分组,并统计每个年级下排在前 N 个的分数。

    分析:

    1. 我们这篇是要利用 MapReduce 自带的排序功能,即根据 key 进行排序,那么我们把 grade、score 封装到一个对象中, compareTo 中根据 score 指定排序规则;
    2. 然后要自定义 partitioner,根据 grade 进行分区。这样每个相同 grade 的对象 就会分到 同一 reducer 中。
    3. 要自定义实现 groupingcomparator

    下面对 GroupingComparator 做下介绍

    GroupingComparator

    在hadoop的mapreduce编程模型中,当在 map 端处理完成输出 key-value对时,reduce端只会将key相同的到同一个reduce函数中去执行,如果现在map端输出的key是一个对象 TextPair,那这样每个 map 端到 reduce 都会变成如下形式 (因为每个对象都不一样,所以不能聚合到一起):

    <textPair01,1>
    <textPair02,1>
    <textPair03,1>
    <textPair04,1>
    ...
    

    但是我们又有这样的需求:根据 TextPair 的某一个成员A,所有具有相同 A 的TextPair 都放到一个 reducer 函数中处理,这个A 就相当于 “相同的Key”。我们可以通过 GroupingComparator 实现此功能。

    套用我们的例子,即所有相同的 grade 的对象,都放到同一个 reducer 中处理,并取前 N 个值。

    这里和 partitioner 做下区分,有些人可能混淆。
    partitioner 是把 所有相同 grade 的对象放到一个 Reducer Task 中,但聚合还是要根据相同 key 的,而我们 每个对象都不一样,所以没办法聚合,所以要使用 GroupingComparator 。

    上代码:

    定义成绩信息bean

    public class ScoreBean implements WritableComparable<ScoreBean>{  
        private Text grade;  
        private DoubleWritable score;  
      
        public ScoreBean() {  
        }  
        public ScoreBean(Text grade, DoubleWritable score) {  
            set(grade, score);  
        }  
      
        public void set(Text grade, DoubleWritable score) {  
      
            this.grade = grade;  
            this.score = score;  
      
        }  
      
        public Text getGrade() {  
            return grade;  
        }  
      
        public DoubleWritable getScore() {  
            return score;  
        }  
      
        @Override  
        public int compareTo(ScoreBean o) {  
            int cmp = this.grade.compareTo(o.getGrade());  
            if (cmp == 0) {  
      
                cmp = -this.score.compareTo(o.getScore());  
            }  
            return cmp;  
        }  
      
        @Override  
        public void write(DataOutput out) throws IOException {  
            out.writeUTF(grade.toString());  
            out.writeDouble(score.get());  
              
        }  
      
        @Override  
        public void readFields(DataInput in) throws IOException {  
            String readUTF = in.readUTF();  
            double readDouble = in.readDouble();  
              
            this.grade = new Text(readUTF);  
            this.score= new DoubleWritable(readDouble);  
        }  
      
      
        @Override  
        public String toString() {  
            return grade.toString() + "\t" + score.get();  
        }  
    }  
    

    自定义partation分片:

    public class GradePartitioner extends Partitioner<ScoreBean, NullWritable>{  
      
        @Override  
        public int getPartition(ScoreBean bean, NullWritable value, int numReduceTasks) {  
            //相同grade的成绩bean,会发往相同的partition  
            //而且,产生的分区数,是会跟用户设置的reduce task数保持一致  
            return (bean.getGrade().hashCode() & Integer.MAX_VALUE) % numReduceTasks;    
        }  
    }  
    

    自定义groupingcomparator

    public class GradeGroupingComparator extends WritableComparator {  
      
        protected GradeGroupingComparator() {  
      
            super(ScoreBean.class, true);  
        }  
          
        @Override  
        public int compare(WritableComparable a, WritableComparable b) {  
            ScoreBean abean = (ScoreBean) a;  
            ScoreBean bbean = (ScoreBean) b;  
              
            //将grade相同的bean都视为相同,从而聚合为一组  
            return abean.getGrade().compareTo(bbean.getGrade());  
        }  
    } 
    

    编写mapreduce处理流程

    public class SecondarySort {  
          
        static class SecondarySortMapper extends Mapper<LongWritable, Text, ScoreBean, NullWritable>{  
              
            ScoreBean bean = new ScoreBean();  
              
            @Override  
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
      
                String line = value.toString();  
                String[] fields = StringUtils.split(line, "\t");  
                  
                bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));  
                  
                context.write(bean, NullWritable.get());  
                  
            }  
              
        }  
          
        static class SecondarySortReducer extends Reducer<ScoreBean, NullWritable, ScoreBean, NullWritable>{  
              
              
            //在设置了groupingcomparator以后,这里收到的kv数据 就是:  <1001 87.6>,null  <1001 76.5>,null  ....   
            //此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>  
            //要输出同一个grade的所有成绩中最大金额的那一个,就只要输出这个key  
            @Override  
            protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {  
                context.write(key, NullWritable.get());  
            }  
        }  
          
          
        public static void main(String[] args) throws Exception {  
              
            Configuration conf = new Configuration();  
            Job job = Job.getInstance(conf);  
              
            job.setJarByClass(SecondarySort.class);  
              
            job.setMapperClass(SecondarySortMapper.class);  
            job.setReducerClass(SecondarySortReducer.class);  
              
              
            job.setOutputKeyClass(ScoreBean.class);  
            job.setOutputValueClass(NullWritable.class);  
              
            FileInputFormat.setInputPaths(job, new Path(args[0]));  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
            //指定shuffle所使用的GroupingComparator类  
            job.setGroupingComparatorClass(GradeGroupingComparator.class);  
            //指定shuffle所使用的partitioner类  
            job.setPartitionerClass(GradePartitioner.class);  
              
            job.setNumReduceTasks(3);  
              
            job.waitForCompletion(true);  
        }  
    }  
    

    相关文章

      网友评论

        本文标题:MapReduce 分组 TopN(二次排序)

        本文链接:https://www.haomeiwen.com/subject/zvzcyftx.html