MapReduce实现二次排序

作者: 心_的方向 | 来源:发表于2016-10-26 16:23 被阅读3588次

    二次排序的需求说明

    在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。


    sort1.png

    测试的文件数据

    a 1
    a 5
    a 7
    a 9
    b 3
    b 8
    b 10

    未经过二次排序的输出结果

    a   9
    a   7
    a   5
    a   1
    b   10
    b   8
    b   3
    

    第一种实现思路

    直接在reduce端对分组后的values进行排序。

    • reduce关键代码
            @Override
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
                
                 List<Integer> valuesList = new ArrayList<Integer>();
    
                 // 取出value
                 for(IntWritable value : values) {
                     valuesList.add(value.get());
                 }
                 // 进行排序
                 Collections.sort(valuesList);
                
                 for(Integer value : valuesList) {
                    context.write(key, new IntWritable(value));
                 }
                
            }
    
    • 输出结果
    a   1
    a   5
    a   7
    a   9
    b   3
    b   8
    b   10
    

    很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。

    • 注意的地方(容易被“坑”)
      在reduce端对values进行迭代的时候,不要直接直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。

    第二种实现思路

    将map端输出的<key,value>中的key和value组合成一个新的key(称为newKey),value值不变。这里就变成<(key,value),value>,在针对newKey排序的时候,如果key相同,就再对value进行排序。

    • 需要自定义的地方
    1. 自定义数据类型实现组合key
      实现方式:继承WritableComparable
    2. 自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。
      实现方式:继承partitioner
    3. 自动以分组,保持分组规则任然按照key进行。不打乱原来的分组
      实现方式:继承RawComparator
    • 自定义数据类型关键代码
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.WritableComparable;
    
     public class PairWritable implements WritableComparable<PairWritable> {
        // 组合key
          private String first;
          private int second;
    
        public PairWritable() {
        }
    
        public PairWritable(String first, int second) {
            this.set(first, second);
        }
    
        /**
         * 方便设置字段
         */
        public void set(String first, int second) {
            this.first = first;
            this.second = second;
        }
        
        /**
         * 反序列化
         */
        @Override
        public void readFields(DataInput arg0) throws IOException {
            this.first = arg0.readUTF();
            this.second = arg0.readInt();
        }
        /**
         * 序列化
         */
        @Override
        public void write(DataOutput arg0) throws IOException {
            arg0.writeUTF(first);
            arg0.writeInt(second);
        }
    
        /*
         * 重写比较器
         */
        public int compareTo(PairWritable o) {
            int comp = this.first.compareTo(o.first);
            
            if(comp != 0) {
                return comp;
            } else { // 若第一个字段相等,则比较第二个字段
                return Integer.valueOf(this.second).compareTo(
                        Integer.valueOf(o.getSecond()));
            }
        }
        
        public int getSecond() {
            return second;
        }
        public void setSecond(int second) {
            this.second = second;
        }
        public String getFirst() {
            return first;
        }
        public void setFirst(String first) {
            this.first = first;
        }
    
    • 自定义分区规则
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {
    
        @Override
        public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
            /* 
             * 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
             * 让key中first字段作为分区依据
             */
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; 
        }
    }
    
    
    • 自定义分组比较器
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.WritableComparator;
    
    public class SecondGroupComparator implements RawComparator<PairWritable> {
    
        /*
         * 对象比较
         */
        public int compare(PairWritable o1, PairWritable o2) {
            return o1.getFirst().compareTo(o2.getFirst());
        }
    
        /*
         * 字节比较
         * arg0,arg3为要比较的两个字节数组
         * arg1,arg2表示第一个字节数组要进行比较的收尾位置,arg4,arg5表示第二个
         * 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
         */
        public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
            return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
        }
    }
    
    • map关键代码
            private PairWritable mapOutKey = new PairWritable();
            private IntWritable mapOutValue = new IntWritable();
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String lineValue = value.toString();
                String[] strs = lineValue.split("\t");
                
                //设置组合key和value ==> <(key,value),value>
                mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
                mapOutValue.set(Integer.valueOf(strs[1]));
                
                context.write(mapOutKey, mapOutValue);
            }
    
    • reduce关键代码
            private Text outPutKey = new Text(); 
            public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
                //迭代输出
                for(IntWritable value : values) {
                    outPutKey.set(key.getFirst());
                    context.write(outPutKey, value);
                }
                
            }
    
    • 输出结果
    a   1
    a   5
    a   7
    a   9
    b   3
    b   8
    b   10
    

    相关文章

      网友评论

        本文标题:MapReduce实现二次排序

        本文链接:https://www.haomeiwen.com/subject/emfduttx.html