美文网首页
MapReduce基本原理及案例代码分享

MapReduce基本原理及案例代码分享

作者: 大数据ZRL | 来源:发表于2020-05-14 16:20 被阅读0次

    1. 背景知识

    1. 起源: Google最早用MapReduce做搜索引擎中的倒排索引,简单来说是: 统计每个词条与文档对应的映射关系

    2. 数据块与数据分片: block是HDFS上的文件块,split是文件的分片(逻辑划分,不包含具体数据,只包含这些数据的位置信息)

    • a. 一个split包含一个或多个block,默认是一对一的关系
    • b. 一个split不包含两个文件的block, 不会不会跨文件进行划分
    • c. 一个Map处理一个Split, Map数量由Split决定(HDFS数据源)
    • d. 数据分片尽量保证Map端的负载均衡

    2. MapReduce数据处理流程(以文本格式文件为例)

    1) Map阶段

    • a. 首先将split中的数据以K/V(key/value)的形式读取出来
    • b. 然后将这些数据交给用户自定义的Map函数进行处理
    • c. Map处理完数据后,同样将结果以K/V的形式交给MR的计算框架
    • d. MR计算框架会将不同的数据划分成不同的partition,不同Map计算的相同partition数据会分到同一个reduce节点上面进行处理,也就是说一类partition对应一个reduce

    2) Map阶段特点:

    • a. 一个Map处理一个split
    • b. Map默认使用Hash算法对输出的key值进行Hash计算,这样保证了相同key值的数据能够划分到相同的partition中,同时也保证了不同的partition之间的数据量时大致相当
    • c. Map的数量是由split的数量决定的

    3) Reduce阶段:

    • a. Map处理完后,reduce处理程序在各个Map节点将属于自己的数据拷贝到自己的内存缓冲区中,也就是说: 相同partition的数据被拷贝至同一个reduce中
    • b. 然后将这些数据合并成一个大的数据集,并且按照key值进行聚合,把聚合后的value值以迭代器的方式作为reduce的输入value
    • c. 用户使用自定义的reduce函数处理完迭代器中的数据后,一般把结果以K/V的格式存储到HDFS上的文件中

    4) Reduce阶段特点:

    • a. Reduce的数量是由partition的数量决定的
    • b. 一个Reduce处理相同Key的所有value值(Iterator迭代器形式)

    3. Shuffle过程

    在上面介绍的MR基本过程中,还存在一个shuffle过程,发生于Map和Reduce之间


    1) Map中的shuffle
    • a. Collec阶段将数据放在环形缓冲区,唤醒缓冲区分为数据区和索引区
    • b. sort阶段对在统一partition内的索引按照key值排序
    • c. spill(溢写)阶段根据排好序的索引将数据按顺序写到小文件中
    • d. Merge阶段将Spill生成的小文件分批合并排序成一个大文件

    2) Reduce中的shuffle

    • a. Copy阶段将Map段的数据分批拷贝到Reduce的缓冲区
    • b. Spill阶段将内存缓冲区的数据按照顺序写到小文件中
    • c. Merge阶段将溢出文件合并成一个排好序的数据集

    4. 案例代码****(文本单词统计)

    1) Map与Reduce

    //                                             输入k1,输入v1,输出k2,输出v2
    static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] data = value.toString().split(" ");
                for (int i = 0; i < data.length; i++) {
                        context.write(new Text(data[i]), new IntWritable(1));
    //                                        输出k2:单词,输出v2:计数1次          
                }
          }
    }  
    //                                                输入k3,输入v3,输出k4,输出v4
    static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                 int sum = 0;
                 for (IntWritable v : values) {
                       sum += v.get();
                 }
                 context.write(key, new IntWritable(sum));
    //                  输出k4:单词,输出v4:单词总数量
          }
    }
    

    2) 主程序

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = Job.*getInstance*(new Configuration());
    //      指定主程序类
            job.setJarByClass(WordCountMain.class);
    //      指定Map类及输出key/value类型
            job.setMapperClass(WordCountMap.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    //      指定Reduce类及输出key/value类型
            job.setReducerClass(WordCountReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    //      指定程序输入输出路径(注意输出路径不能已存在)
            FileInputFormat.*addInputPath*(job, new Path(args[0]));
            FileOutputFormat.*setOutputPath*(job, new Path(args[1]));
    //      程序执行并打印Log
            System.*exit*(job.waitForCompletion(true) ? 0 : 1);
    }
    

    3) 自定义数据类型

    • a. 实现Writable接口
    • b. 对字段进行序列化与反序列化
    public class Employee implements Writable {
          private String name;
          private int age;
          private int sal;
    // 序列化
    public void write(DataOutput dataOutput) throws IOException {
          dataOutput.writeUTF(this.name);
          dataOutput.writeInt(this.age);
          dataOutput.writeInt(this.sal);
          }
    //  反序列化
    public void readFields(DataInput dataInput) throws IOException {
          this.name = dataInput.readUTF();
          this.age = dataInput.readInt();
          this.sal = dataInput.readInt();
          }
    

    4) 自定义Partitioner类

    • a. 继承Partitioner抽象类, 重写getPartition方法
    • b. 可以保证Reduce端的负载均衡
    public class MyPartitioner extends Partitioner<IntWritable, Employee> {
          public int getPartition(IntWritable sal, Employee emp, int numReduceTasks) {
              if (sal.get() <= 3333) {
                  return 1 % numReduceTasks;
              } else if (sal.get() <= 6666) {
                  return 2 % numReduceTasks;
              } else {
                  return 3 % numReduceTasks;
              }
    
    //      默认的HashPartitioner实现:
    //      return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
          }
    }
    
    job.setPartitionerClass(MyPartitioner.class);
    job.setNumReduceTasks(3);
    

    5) 自定义Combiner类

    • a. 不是所有情况都可以使用Combiner
    • b. 在Map端进行的特殊ReduceTask, 一般情况下, 与Reduce端代码一样
    • c. Map输出与Combiner输入保持一致,Combiner的输出与Reduce的输入保持一致(执行顺序:MapàCombineràReduce)
    • d. 减少Map与Reduce之间的磁盘IO
    // 与WordCountReducer一样的代码
    public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              int sum = 0;
              for (IntWritable v : values) {
                  sum += v.get();
              }
              context.write(key, new IntWritable(sum));
          }
    }
    
    job.setCombinerClass(MyCombiner.class);
    

    6) 读写ORC格式文件

    //                                                               特定的Orc数据类型
    // 读ORC文件
    public static class ORCMapper extends Mapper<NullWritable, OrcStruct, Text, Text> {
          public void map(NullWritable key, OrcStruct value, Context output) throws IOException, InterruptedException {
              output.write((Text) value.getFieldValue(0),
                           (Text) value.getFieldValue(1));
    //                         获取OrcStruct中的第0和1个元素
    }
    // 写ORC文件
    public static class ORCReducer extends Reducer<Text, Text, NullWritable, OrcStruct> {
    // 定义OrcStruct的类型描述
          private TypeDescription **schema** = TypeDescription.fromString("struct<name:string,mobile:string>");
    
    // 创建OrcStruct对象
          private OrcStruct **pair** = (OrcStruct) OrcStruct.createValue(**schema**);
          public void reduce(Text key, Iterable<Text> values, Context output) throws IOException, InterruptedException {
              for (Text val : values) {
                  pair.setFieldValue(0, key);// 将数值塞到OrcStruct中
                  pair.setFieldValue(1, val);
                  output.write(NullWritable.get(), pair);
              }
          }
    }
    
    // Map类型设置
    job.setInputFormatClass(OrcInputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    
    // Reduce类型设置
    job.setOutputFormatClass(OrcOutputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(OrcStruct.class);
    

    推荐博客:
    大数据新手博客

    相关文章

      网友评论

          本文标题:MapReduce基本原理及案例代码分享

          本文链接:https://www.haomeiwen.com/subject/qhupnhtx.html