美文网首页
hadoop(9)Shuffle之排序

hadoop(9)Shuffle之排序

作者: Tomy_Jx_Li | 来源:发表于2018-10-20 16:58 被阅读59次

    1 mapReduce的排序

    前面讲过在map数据传入reduce之前需要进行数据的排序、分组。而排序的实现也很简单。map的输出在进行排序的时候都是通过key进行的排序,而要对key进行排序,那么key就需要是先jdk的Comparable接口。同时key还需要序列化,需要实现hadoop自定义的Writable接口。所以呢为了方便,hadoop提供了WritableComparable接口。这个接口继承了上面的两个接口。
    所以这里引申出了一个问题,当最后的排序,和key的比较值不相等时如何处理呢?也就是我的key需要进行自定义的排序,但是排序的字段不能唯一确定我的这行数据
    是不是需要进行两次mapreduce才行呢?

    2 实现

    数据是已经经过合并后的:

    13480253104 180 180 360
    13502468823 7335    110349  117684
    13560436666 1116    954 2070
    13560439658 2034    5892    7926
    13602846565 1938    2910    4848
    13660577991 6960    690 7650
    13719199419 240 0   240
    13726230503 2481    24681   27162
    13726238888 2481    24681   27162
    13760778710 120 120 240
    13826544101 264 0   264
    13922314466 3008    3720    6728
    13925057413 11058   48243   59301
    13926251106 240 0   240
    13926435656 132 1512    1644
    15013685858 3659    3538    7197
    15920133257 3156    2936    6092
    15989002119 1938    180 2118
    18211575961 1527    2106    3633
    18320173382 9531    2412    11943
    84138413    4116    1432    5548
    

    map:

    package com.jiyx.test.mapred.sort;
    
    import com.jiyx.test.mapred.sort.DataBean;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * key是自定义javaBean,只需要实现了WritableComparable接口,
     * 那么shuffle在进行排序的时候就会按照用户自动的方法进行排序了
     * @author jiyx
     * @create 2018-10-20-16:04
     */
    public class SortMapper extends Mapper<LongWritable, Text, DataBean, NullWritable> {
    
        private DataBean k = new DataBean();
    
        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            String[] datas = value.toString().split("\t");
            long phoneNum = Long.parseLong(datas[0]);
            long upFlow = Long.parseLong(datas[1]);
            long downFlow = Long.parseLong(datas[2]);
            context.write(k.set(phoneNum, upFlow, downFlow), NullWritable.get());
        }
    }
    

    reduce:

    package com.jiyx.test.mapred.sort;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * @author jiyx
     * @create 2018-10-20-16:19
     */
    public class SortReducer extends Reducer<DataBean, NullWritable, DataBean, NullWritable> {
        @Override
        protected void reduce(DataBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    

    自定义key:

    package com.jiyx.test.mapred.sort;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * @author jiyx
     * @create 2018-10-15-19:22
     */
    public class DataBean implements WritableComparable<DataBean> {
    
        private long phoneNum;
    
        private long upFlow;
    
        private long downFlow;
    
        private long totalFlow;
    
        public DataBean() {
        }
    
        public DataBean(long phoneNum, long upFlow, long downFlow) {
            this.set(phoneNum, upFlow, downFlow);
        }
    
        /**
         * 序列化
         * @param dataOutput
         * @throws IOException
         */
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(phoneNum);
            dataOutput.writeLong(upFlow);
            dataOutput.writeLong(downFlow);
            dataOutput.writeLong(totalFlow);
        }
    
        public DataBean set(long phoneNum, long upFlow, long downFlow) {
            this.phoneNum = phoneNum;
            this.downFlow = downFlow;
            this.upFlow = upFlow;
            this.totalFlow = upFlow + downFlow;
            return this;
        }
    
        /**
         * 反序列化
         * @param dataInput
         * @throws IOException
         */
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            phoneNum = dataInput.readLong();
            upFlow = dataInput.readLong();
            downFlow = dataInput.readLong();
            totalFlow = dataInput.readLong();
        }
    
        /**
         * 按照totalFlow升序排列
         * @param o
         * @return
         */
        @Override
        public int compareTo(DataBean o) {
            // 这里不能返回0,因为排序的是已经统计过的数据,所以没有重复的数据了
            // 所以这里需要按照自己的逻辑处理
            if (this.totalFlow == o.getTotalFlow()) {
                 return 1;
            }
            return this.totalFlow > o.getTotalFlow() ? 1 : -1;
        }
    
        /**
         * 重写toString主要是为了后面的写入文件
         * @return
         */
        @Override
        public String toString() {
            return this.phoneNum + "\t" + this.upFlow + "\t" + this.downFlow + "\t" + this.totalFlow;
        }
    
        public long getPhoneNum() {
            return phoneNum;
        }
    
        public void setPhoneNum(long phoneNum) {
            this.phoneNum = phoneNum;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getTotalFlow() {
            return totalFlow;
        }
    
        public void setTotalFlow(long totalFlow) {
            this.totalFlow = totalFlow;
        }
    
    }
    

    job:

    package com.jiyx.test.mapred.sort;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author jiyx
     * @create 2018-10-20-16:35
     */
    public class SortJob {
        public static void main(String[] args) throws Exception {
            Job job = Job.getInstance();
    
            job.setJarByClass(SortJob.class);
    
            job.setMapperClass(SortMapper.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            job.setReducerClass(SortReducer.class);
            // 这块需要注意的是自己踩了一个坑,就是将key和value整反了
            // 然后就会出现异常java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.jiyx.test.mapred.flowStatistics.bo.DataBean
            // 所以这里最好注意下
            job.setOutputKeyClass(DataBean.class);
            job.setOutputValueClass(NullWritable.class);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
        }
    }
    

    排序后的结果展示:

    13760778710 120 120 240
    13719199419 240 0   240
    13926251106 240 0   240
    13826544101 264 0   264
    13480253104 180 180 360
    13926435656 132 1512    1644
    13560436666 1116    954 2070
    15989002119 1938    180 2118
    18211575961 1527    2106    3633
    13602846565 1938    2910    4848
    84138413    4116    1432    5548
    15920133257 3156    2936    6092
    13922314466 3008    3720    6728
    15013685858 3659    3538    7197
    13660577991 6960    690 7650
    13560439658 2034    5892    7926
    18320173382 9531    2412    11943
    13726230503 2481    24681   27162
    13726238888 2481    24681   27162
    13925057413 11058   48243   59301
    13502468823 7335    110349  117684
    

    相关文章

      网友评论

          本文标题:hadoop(9)Shuffle之排序

          本文链接:https://www.haomeiwen.com/subject/hasmzftx.html