MapReduce对文件的二次排序操作(三)

作者: 小飞牛_666 | 来源:发表于2018-10-08 23:26 被阅读10次
    所谓的二次排序就是对文件中先对第一个字段排序,如果第一个字段相同,则根据第一个字段再对第二个字段进行排序(即先根据键排序,然后在根据相同的键对其值进行排序),先看最基础的效果图就明白了:
    image.png
    一、自定义一个实现 WritableComparable 接口的类型,用于对数据的排序:
    public class SortWritable implements WritableComparable<SortWritable> {
    
        //分别代表第一个字段和第二个字段
        private String first;
        private int second;
    
        public SortWritable() {
        }
    
        public SortWritable(String first, int second) {
            this.set(first,second);
        }
    
        //为方便调用我们创建一个方法
        public void set(String first, int second){
            this.first = first;
            this.second = second;
        }
    
        public String getFirst() {
            return first;
        }
    
        public void setFirst(String first) {
            this.first = first;
        }
    
        public int getSecond() {
            return second;
        }
    
        public void setSecond(int second) {
            this.second = second;
        }
    
        //先根据第一个字段比较排序,如果相同在根据第二个比较排序
        public int compareTo(SortWritable o) {
            int comp = this.getFirst().compareTo(o.getFirst());
            if(0 == comp){
                return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond()));
            }
            return comp;
        }
    
        //序列化
        public void write(DataOutput dataOutput) throws IOException {
    
            dataOutput.writeUTF(first);
            dataOutput.writeInt(second);
    
        }
    
        //反序列化
        public void readFields(DataInput dataInput) throws IOException {
    
            this.first = dataInput.readUTF();
            this.second = dataInput.readInt();
    
        }
    
        //一下三个方法都是快捷生成
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
    
            SortWritable that = (SortWritable) o;
            if(second != that.second) return false;
            return first != null ? first.equals(that.first) : that.first == null;
        }
    
        @Override
        public int hashCode() {
            int result = first != null ? first.hashCode() : 0;
            result = 31 * result + second;
            return result;
        }
    
        @Override
        public String toString() {
            return "SortWritable{" +
                    "first='" + first + '\'' +
                    ", second=" + second +
                    '}';
        }
    
    }
    
    
    二、创建驱动类基础框架:
    public class MySecondSortMR extends Configured implements Tool {
    
        public int run(String[] args) throws Exception {
    
            //驱动
            //1) 获取 Configuration
            Configuration configuration = this.getConf();
    
            //2) 创建 job
            Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
            job.setJarByClass(this.getClass());
    
            //3.1) 输入
            Path inputPath = new Path(args[0]);
            FileInputFormat.addInputPath(job, inputPath);
    
            //3.2 设置 map
            job.setMapperClass(SecondMapper.class);
            job.setMapOutputKeyClass(SortWritable.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //1.分区
            //job.setPartitionerClass(FirstPartitioner.class);
    
            //2.压缩
            //configuration.set("mapreduce.map.output.compress","true");
            //configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
    
            //3.分组
            //job.setGroupingComparatorClass(FirstGrouping.class);
    
            //3.3 设置 reduce
            job.setReducerClass(SecondReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //设置 reduce 的任务个数
            //job.setNumReduceTasks(2);
    
            //3.4 设置输出
            Path outputPath = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            //4.提交
            boolean sucess = job.waitForCompletion(true);
            return sucess ? 0 : 1;
        }
    
        public static void main(String[] args) {
    
            //当打包成 jar 之前 记得注释掉
            args = new String[]{
                    "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/secondsort.txt",
                    "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output9"
            };
    
            Configuration configuration = new Configuration();
    
            try {
                //先判断文件夹是否存在
                Path fileOutPath = new Path(args[1]);
                FileSystem fileSystem = FileSystem.get(configuration);
    
                if(fileSystem.exists(fileOutPath)){
                    fileSystem.delete(fileOutPath, true); //删除
                }
    
                int status = ToolRunner.run(configuration, new MySecondSortMR(), args);
                System.exit(status);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }
    
    
    三、创建一个Mapper的子类,用于对数据的切分及逻辑的的操作(这里值得注意的是输出的键是我们自定义的类型SortWritable):
    public static class SecondMapper extends Mapper<LongWritable, Text, SortWritable, IntWritable>{
    
            private SortWritable outputKey = new SortWritable();
            private IntWritable outputValue = new IntWritable();
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
    
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String[] values = value.toString().split(" ");
    
                if(2 != values.length) return;
    
                outputKey.set(values[0], Integer.valueOf(values[1]));
                outputValue.set(Integer.valueOf(values[1]));
                context.write(outputKey, outputValue);
            }
    
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
    
            }
        }
    
    
    四、创建 Reducer 的子类(数据类型的输入要和mapper的输出类型要一致):
    public static class SecondReduce extends Reducer<SortWritable, IntWritable, Text, IntWritable>{
    
            private Text outputKey = new Text();
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
    
            }
    
            @Override
            protected void reduce(SortWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
                for(IntWritable value : values){
                    outputKey.set(key.getFirst());
                    context.write(outputKey, value);
                }
    
            }
    
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
    
            }
        }
    
    
    五、运行程序如果没有问题的话即可直接在网页 /user/hdfs/output9 查看生成的数据,接下来我们使用命令查看排序的结果:
    bin/hdfs dfs -text /user/hdfs/output9/part*
    

    效果图如下:


    image.png
    六、为达到优化效果,我们可做如下设置:
    image.png

    由于键值的组合,为保原有的分区与分组原有的结构,我们需要去自定义分区与分组类。

    七、自定义一个实现RawComparator接口的分组类:
    public class FirstGrouping implements RawComparator<SortWritable> {
    
        //通过字节数组进行对比
        public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {
            //int有四个字节,因此从 0 开始 到 i - 4
            return WritableComparator.compareBytes(bytes1,0,i1 - 4,bytes2,0,i3-4);
        }
    
        //通过对象进行对比
        public int compare(SortWritable o1, SortWritable o2) {
            return o1.getFirst().compareTo(o2.getFirst());
        }
    
    }
    
    
    八、自定义一个继承自Partitioner的分区类:
    public class FirstPartitioner extends Partitioner<SortWritable, IntWritable> {
    
        public int getPartition(SortWritable key, IntWritable intWritable, int i) {
            return (key.getFirst().hashCode() & 2147483647) % i;
        }
    
    }
    
    

    接下来我们我们再去掉驱动类的 run() 方法中的 分组和分区的注释语句再运行程序,同样得到我们所需要的效果。。。


    image.png

    感谢老师与各位大神的指点,感恩一切。。。

    相关文章

      网友评论

        本文标题:MapReduce对文件的二次排序操作(三)

        本文链接:https://www.haomeiwen.com/subject/pglsaftx.html