【大数据学习】第十一篇-MapReduce简介

作者: irving_yuan | 来源:发表于2019-07-14 13:05 被阅读4次

    MapReduce定义

    MapReduce是一个分布式计算的框架,是用户开发机遇hadoop的数据分析应用的核心框架。

    MapReduce的优缺点

    • 优点
    1. 易于编程 只要实现一些简单的接口即可实现功能,且编写程序类似串行
    2. 良好的扩展性 支持扩展计算服务器的数量
    3. 高容错性 可以在价格低廉的机器上运行,即便集群中某些节点宕机,也可以正常使用
    4. 适合PB级离线计算
    • 缺点
      不擅长实时计算、流式计算、DAT计算

    MapReduce的编程思想

    MapReduce的编程思想

    MapReduce主要包括两个部分 Map阶段 + Reduce阶段,每个阶段中的输入输出都是key-value的形式存在
    已文本词数统计为例,两个阶段的流程如下:

    1. Map阶段读取Hadoop分片的数据,按行读取自动进行一次map操作,得到输入key-value对应为 “偏移量-本行数据”。
      偏移量实际是该行起始的数据长度索引,可以理解为行号,例如第一行偏移量为0,数据10byte,则第二行偏移量为11。
    2. Map阶段第二步执行我们实现的接口算法,并将结果的key-value(单词-每行的词频 如 java - 1 2 1 4 1)输出都磁盘上。整个Map阶段都是完全并行执行的。
    3. Reduce阶段读取Map的结果,执行实现的接口,对每个分片的结果进行初次的汇总
    4. Reduce阶段对每个分片的结果再次进行汇总成为一个最终结果
      注:通常一个分片对应hadoop中存储的一个块,即128M,这也可以避免载入内存文件过大,撑爆内存

    实现wordcount编码

    Map类,继承Mapper,重写其map方法实现对每个单词的统计,范型是根据自己业务需要定义的类型

    package com.irving.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * map执行类
     * 四个范型是map输入和输出的类型
     * @LongWritable 字符偏移量
     * @Author yuanyc
     * @Date 15:17 2019-07-11
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        /**
         * 重写map方法
         * @Author yuanyc
         * @Date 15:19 2019-07-11
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 每行起始的偏移量
            System.out.println("-------");
            System.out.println("偏移量" + key.get());
            // 按行读取的数据
            String line = value.toString();
            // 根据空格切分str
            String[] arr = line.split(" ");
            // 对字符传标记1
            for (String str : arr) {
                context.write(new Text(str), new IntWritable(1));
            }
        }
    }
    

    Reduce类,继承Reducer类,重写reduce方法,实现对map结果的汇总

    package com.irving.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reduce类
     * @Author yuanyc
     * @Date 11:17 2019-07-14
     */
    public class WordCountRecuder extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // values 形如 1, 2,2,1,1
            // 对词频进行统计
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            context.write(key, new IntWritable(count));
        }
    }
    

    编写启动类,执行mapreduce算法
    注:输入输出路径应当为hdfs的目录,但是本地调试阶段可以使用linux文件系统目录

    package com.irving.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * 启动类
     * @Author yuanyc
     * @Date 15:39 2019-07-11
     */
    public class WordCountMain {
        public static void main(String[] args) {
    
            Configuration configuration = new Configuration();
    //        args = new String[]{"/Users/yuanyc/Documents/workspace/hdfs/test.txt", "/Users/yuanyc/Documents/workspace/hdfs/out"};
    
            try {
                // 创建job
                Job job = Job.getInstance(configuration);
                job.setJarByClass(WordCountMain.class);
    
                // 指定map类
                job.setMapperClass(WordCountMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(IntWritable.class);
    
                // 指定reduce
                job.setReducerClass(WordCountRecuder.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
    
                // 指定输入输出路径
                FileInputFormat.setInputPaths(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
                // 提交任务
                job.waitForCompletion(true);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    }
    

    测试数据


    测试数据

    本地执行结果


    分片为1
    执行结果

    注:输出目录不能重复存在,要重新执行时需要删除现有目录

    Hadoop的序列化

    通过上面代码可以看出,MR在编码过程中使用的输入输出对象类型都是不是自定义的类型。
    使用的这些类型是Hadoop定义的基础类型,由于mapreduce过程中伴随大量的IO操作,因此需要针对序列化进行性能优化。
    Java常用类型与Hadoop序列化类型的对照表

    JDK的类型 Hadoop序列化类型
    int IntWritable
    long LongWritable
    float FloatWritable
    double DoubleWritable
    byte ByteWritable
    boolean BooleanWritable
    String Text
    Map MapWritable
    Array ArrayWritable

    自定义Java对象的序列化

    自定义的Bean需要实现writable接口,重写序列化和反序列化的方法

    package com.irving.wordcount;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * 自定义bean的hadoop序列化
     * @Author yuanyc
     * @Date 12:37 2019-07-14
     */
    public class BeanWritable implements Writable, Comparable {
    
        private String name;
        private int age;
    
        /**
         * 序列化方法
         * @Author yuanyc
         * @Date 12:53 2019-07-14
         */
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeChars(name);
            dataOutput.writeInt(age);
        }
    
        /**
         * 反序列化,顺序要与序列化一致
         * @Author yuanyc
         * @Date 12:53 2019-07-14
         */
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            name = dataInput.readUTF();
            age = dataInput.readInt();
        }
    
        /**
         * 自定义对象用key时,需要重写compareto方法,用于shuffle阶段的排序
         * @Author yuanyc
         * @Date 12:52 2019-07-14
         */
        @Override
        public int compareTo(Object o) {
            return 0;
        }
    }
    

    相关文章

      网友评论

        本文标题:【大数据学习】第十一篇-MapReduce简介

        本文链接:https://www.haomeiwen.com/subject/zalbkctx.html