美文网首页hadoop
MapReduce(五):Shuffle机制

MapReduce(五):Shuffle机制

作者: codeMover | 来源:发表于2021-12-14 23:27 被阅读0次

    Shuffle机制

    Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

    2.3 Shuffle机制.png

    Partition分区

    如何按照条件输出到不同文件(分区)中,MapReduce提供了Partitioner功能。默认采用hash值的方式。

    public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
    
      public void configure(JobConf job) {}
    
      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(K2 key, V2 value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    
    }
    

    默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制那个key存储在那个分区。

    自定义Partitioner步骤

    1)自定义类继承Partitioner,重写getPartition()方法

    public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
    
       @Override
       public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
           String substring = text.toString().substring(0, 2);
           if("135".equals(substring)){
               return 0;
           }
           return 1;
       }
    }
    

    2)在Job驱动中,设置自定义Partitioner

            job.setPartitionerClass(ProvincePartitioner.class);
    

    3)自定义Partition后,需要根据自定义Partitioner的逻辑设置相应数量的ReduceTask。

            job.setNumReduceTasks(2);
    

    分区总结

    1)如果ReduceTask数量>getPartition的结果数,则会多产生几个空的输出文件part-r-oooxx;

    2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无法安放,会Exception

    3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;

    4)分区号必须从零开始,逐一累加。

    代码实战

    FlowBean.java

    public class FlowBean implements Writable {
    
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
        public FlowBean() {
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow() {
            this.sumFlow = this.upFlow + this.downFlow;
        }
    
        @Override
        public String toString() {
            return upFlow +
                "\t" + downFlow +
                "\t" + sumFlow;
        }
    }
    

    FlowMapper.java

    public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
        private Text outK = new Text();
        private FlowBean outV = new FlowBean();
        @Override
        protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
            // 1 获取一行
            String line = value.toString();
            // 2 切割
            String[] split = line.split(" ");
            System.out.println(split.length);
            // 3 抓取数据
            String phone = split[0];
            String upFlow = split[split.length-3];
            String downFlow = split[split.length-2];
            // 4 封装
            outK.set(phone);
            outV.setUpFlow(Long.parseLong(upFlow));
            outV.setDownFlow(Long.parseLong(downFlow));
            outV.setSumFlow();
    
            context.write(outK,outV);
        }
    }
    

    FlowReducer.java

    public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
        private FlowBean outV = new FlowBean();
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,
            Reducer<Text, FlowBean, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
            // 1 遍历集合类价值
            long totalUp = 0;
            long totalDown = 0;
            for (FlowBean value : values) {
                totalUp += value.getUpFlow();
                totalDown += value.getDownFlow();
            }
            // 3 封装
            outV.setUpFlow(totalUp);
            outV.setDownFlow(totalDown);
            outV.setSumFlow();
            // 4 写出
            context.write(key,outV);
    
        }
    }
    

    ProvincePartitioner.java

    public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
    
        @Override
        public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
            String substring = text.toString().substring(0, 2);
            if("135".equals(substring)){
                return 0;
            }
            return 1;
        }
    }
    

    FlowDriver.java

    public class FlowDriver {
    
        public static void main(String[] args)
            throws IOException, InterruptedException, ClassNotFoundException {
            // 1 获取job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 2 设置jar
            job.setJarByClass(FlowDriver.class);
            // 3 关联mapper和reducer
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
            // 4 设置mapper输出的key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            // 5 设置最终输出的key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            job.setPartitionerClass(ProvincePartitioner.class);
            job.setNumReduceTasks(2);
            // 6 设置数据的输入路径和输出路径
            FileInputFormat.setInputPaths(job,new Path(System.getProperty("user.dir")+"/input/partitioner2"));
            FileOutputFormat.setOutputPath(job,new Path(System.getProperty("user.dir")+"/output/partitioner2"));
            // 7 提交job
            Boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    WritableComparable排序

    排序是MapReduce框架中最重要的操作之一。

    MapTask和ReduceTask均会对数据按照Key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

    默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

    排序概述

    对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序(内存完成),并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

    对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

    排序分类

    1)部分排序

    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

    2)全排序

    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

    3)辅助排序

    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序。

    4)二次排序

    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

    自定义排序WritableComparable原理分析

    bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

    FlowBean.java

    public class FlowBean implements WritableComparable<FlowBean> {
    
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
        public FlowBean() {
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow() {
            this.sumFlow = this.upFlow + this.downFlow;
        }
    
        @Override
        public String toString() {
            return upFlow +
                "\t" + downFlow +
                "\t" + sumFlow;
        }
    
        @Override
        public int compareTo(FlowBean o) {
            if (this.sumFlow > o.sumFlow) {
                return -1;
            } else if (this.sumFlow < o.sumFlow) {
                return 1;
            } else {
                if (this.upFlow > o.upFlow) {
                    return 1;
                } else if (this.upFlow < o.upFlow) {
                    return -1;
                } else {
                    return 0;
                }
            }
        }
    }
    

    FlowMapper.java

    public class FlowMapper extends Mapper<LongWritable, Text,FlowBean, Text> {
        private FlowBean  outK= new FlowBean();
        private Text outV = new Text();
    
        @Override
        protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, FlowBean, Text>.Context context)
            throws IOException, InterruptedException {
            // 获取1行
            String line = value.toString();
            // 切割
            String[] split = line.split(" ");
            // 封装
            outV.set(split[0]);
            outK.setUpFlow(Long.parseLong(split[1]));
            outK.setDownFlow(Long.parseLong(split[2]));
            // 写出
            context.write(outK,outV);
        }
    }
    

    FlowReducer.java

    public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
        private FlowBean outV = new FlowBean();
    
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values,
            Reducer<FlowBean, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
            for (Text value : values) {
                context.write(value,key);
            }
    
        }
    }
    

    FlowDriver.java

    public class FlowDriver {
    
        public static void main(String[] args)
            throws IOException, InterruptedException, ClassNotFoundException {
            // 1 获取job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 2 设置jar
            job.setJarByClass(FlowDriver.class);
            // 3 关联mapper和reducer
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
            // 4 设置mapper输出的key和value类型
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);
            // 5 设置最终输出的key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            // 6 设置数据的输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/writeableComparable"));
            FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/writeableComparable"));
            // 7 提交job
            Boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    Combiner

    Combiner

    1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

    2)Combiner组件的父类就是Reducer。

    3)Combiner和Reducer的区别在于运行的位置。

    Combiner是在每一个MapTask所在的节点运行;

    Reducer是接受全局所有Mapper的输出结果;

    4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络流量。

    5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv能够跟Reducer的输入kv类型要对应起来。

    6)因为Combiner代码和Reducer代码一致,可以直接设置Reducer代码为Combiner代码

    案例

    WordCountMapper.java

    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        private Text outKey = new Text();
        private IntWritable outV = new IntWritable(1);
    
        @Override
        public void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
            // 1 获取一行
            String lineStr = value.toString();
            // 2 切割
            String[] words = lineStr.split(" ");
            // 3 循环写出
            for (String word : words) {
                // 封装outKey
                outKey.set(word);
                // 写出
                context.write(outKey, outV);
            }
        }
    }
    

    WordCountReducer.java

    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable outV = new IntWritable();
    
        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
            int sum = 0;
            // 累加
            for (IntWritable value : values) {
                sum += value.get();
            }
            outV.set(sum);
            // 写出
            context.write(key,outV);
        }
    }
    

    WordCountCombiner.java

    public class WordCountCombiner extends Reducer<Text, IntWritable,Text, IntWritable> {
    
        private IntWritable outV = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            outV.set(sum);
            context.write(key,outV);
    
        }
    }
    

    WordCountDriver.java

    public class WordCountDriver {
    
        public static void main(String[] args) throws Exception {
            //1 获取job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            //2 设置jar包路径
            job.setJarByClass(WordCountDriver.class);
            //3 关联mapper、reducer
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            //4 设置mapper输出的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            //5 设置最终输出的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.setCombinerClass(WordCountCombiner.class);
            // 可以直接将Reducer设置为Combiner,因为这两处代码逻辑一致
            // job.setCombinerClass(WordCountReducer.class);
            //6 设置输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/combiner"));
            FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/combiner"));
            //7 提交job
            Boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    小结

    本小节是重点!!!描述了Shuffle机制(在mapper之后reducer之前,如果没有reducer那么combiner将不执行)。详细描述了分区、排序以及聚合,多理解。

    相关文章

      网友评论

        本文标题:MapReduce(五):Shuffle机制

        本文链接:https://www.haomeiwen.com/subject/eekufrtx.html