美文网首页
05. MapReduce

05. MapReduce

作者: 哈哈大圣 | 来源:发表于2019-11-17 18:09 被阅读0次

    MapReduce

    一、概述

    1). 特点

    1. 优点:海量数据离线处理&易开发&易运行
    2. 不适用场景:实时流计算

    2). 编程模型案例简单介绍


    分布式计算框架MapReduce.png

    以上是一个词频统计案例,我们需要实现的就是Mapping和Reducing阶段

    3). MapReduce的过程

    1. 将作业拆分成Map阶段和Reduce阶段
    2. Map阶段:Map Tasks
    3. Reduce阶段:Reduce Tasks

    4). MapReduce编程模型的执行步骤

    1. 准备map处理输入数据
    2. Mapper处理
    3. Shuffle
    4. Reduce处理
    5. 结果输出

    MapReduce执行步骤.png

    InputFormat类 需要好好研究

    4). MapReduce编程模型核心概念

    1. Split
    2. InputFormat
    3. OutputFormat
    4. Combiner
    5. Partitioner

    二、单词统计案例(实战一)

    统计一个文件中以空格或者其他分割的不区分大小写单词的个数

    1). 实现Mapper接口

    1. 注意实现Mapper的包包名

    2. 抽象类中的泛型

      1. KEYIN: Map任务读数据的key类型,offset,是每行数据起始位置的偏移量,一般为Long
      2. VALUEIN: Map任务读数据的value类型,其实就是一行行的字符串,例如String
      3. KEYOUT: map方法自定义实现输出的key的类型,例如String
      4. VALUEOUT: map方法自定义实现输出的value的类型,例如Integer
    3. Hadoop分布式文件系统,数据交互经过网络,必然涉及到【序列化】【反序列化】,它有自己的数据类型支持处理这些要求(实现对应的接口org.apache.hadoop.io.Writable),框架会调用接口中的write方法进行序列化对象写入、readFields方法进行对象读取(要保持写入和读取的顺序一致)

    4. 案例:词频统计:相同单词的次数(word, 1) 注意,这里不需要计算!!只是中间转换!!

    5. Long,String,Integer是Java里面的数据类型,Hadoop中可以对应使用自带的LongWritable,Text,IntWritable(当然可以自己实现对应的接口)

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @author Liucheng
     * @since 2019-11-12
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        /** 【模板设计模式】
         * 重写这个方法,进行中间转换并放入【上下文环境】
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 把value对应的行数据按照指定的分隔符拆开
            final String[] words = value.toString().split("[^a-zA-Z0-9]+");
    
            for (String word : words) {
                // (hello, 1) (word, 1)
                // 不区分大小写!
                // 注意,这里只是转换,不计算;
                context.write(new Text(word.toLowerCase()), new IntWritable(1));
            }
        }
    }
    

    2). 实现Reducer接口

    注意:map中的values迭代器每次迭代的对象是同一个,每次迭代此对象封装了下一个数据,所以读取出来的数据必须再次封装然后交给其他逻辑再处理,否则最后拿到的内容为迭代器迭代的最后一个值!

    1. 同样注意包名
    2. 实现的泛型
      1. KEYIN: 对应Mapper或者Combiner输出的key
      2. VALUEIN: 对应Mapper或者Combiner输出的value
      3. KEYOUT: 处理结果key
      4. VALUEOUT: 处理结果value
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    /**
     * @author Liucheng
     * @since 2019-11-12
     */
    public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
    
        /** 【模板设计模式】
         *  map的输出到reduce端,是按照相同的key分发到一个reduce上去执行
         *  此时的数据关系类似以下的举例                      key    values
         *    (hello,1)(hello,1)(hello,1) ==> reduce1: (hello, <1,1,1>)
         *    (world,1)(world,1)(world,1) ==> reduce2: (world, <1,1,1>)
         *    (welcome,1)                 ==> reduce3: (welcome, <1>)
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int count = 0;
            // 不管是迭代器还是forEach,如果读取的为封装的对象,必须再次封装然后使用,应为迭代器迭代的对象永远只有一个,只是内容变化了而已!
            Iterator<IntWritable> iterator = values.iterator();
    
            // <1,1,1>
            while (iterator.hasNext()) {
                IntWritable value = iterator.next();
                count += value.get();
            }
    
            context.write(key, new IntWritable(count));
        }
    }
    

    3). 客户端测试

    1. 本地文件系统测试
    import com.hahadasheng.bigdata.hadooplearning.utils.FileUtilsLocal;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    /**
     * 本地任务测试:
     * 【强烈推荐】在本地测试,方便调试代码,代码没得问题后再打包丢到服务器进行调优!
     * @author Liucheng
     * @since 2019-11-14
     */
    public class WordCountLocalApp {
    
        public static void main(String[] args) throws Exception {
    
            // 系统配置
            Configuration configuration = new Configuration();
    
            // 创建一个Job
            Job job = Job.getInstance(configuration);
    
            // 设置Job对应的参数:主类
            job.setJarByClass(WordCountLocalApp.class);
    
            // 设置Job对应的参数:设置自定义的Mapper和Reducer处理类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            // 设置Job对应参数:Mapper输出的key和value类型(泛型中后面两个的类型)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 设置Job对应的参数:Reduce输出key和value的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            String pathIn = "E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\localtest\\wc.txt";
            String pathOut = "E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\localtest\\count";
            // 本地递归删除文件夹(代码略)
            FileUtilsLocal.removeFileRecursion(pathOut);
    
            // 设置Job对应的参数:作业输入和输出的路径
            FileInputFormat.setInputPaths(job, new Path(pathIn));
            FileOutputFormat.setOutputPath(job,  new Path(pathOut));
    
            // 提交job
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : -1);
        }
    }
    
    1. 本地程序,hdfs文件系统测试
    package com.hahadasheng.bigdata.hadooplearning.mapreducerlearning.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.net.URI;
    
    /**
     * 使用MR统计HDFS上的文件对应的词频
     * Driver: 配置Mapper, Reducer的相关属性
     * 提交到本地运行:开发过程使用
     * Win环境下的注意事项:(针对2.6.0-cdh5.15.1版本); 否则启动会报错
     *      java.lang.UnsatisfiedLinkError:
     *      org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V
     *      或者其他错误!!!!
     *
     * 1. 在 https://github.com/steveloughran/winutils 下的 hadoop-3.0.0 / bin中
     *    将hadoop.dll和winutils.exe下来放入本地,例如 D:\hadoop\bin
     * 2. 配置系统环境变量
     *    HADOOP_HOME = D:\hadoop
     *    Path 添加 %HADOOP_HOME%\bin
     *
     * @author Liucheng
     * @since 2019-11-13
     */
    public class WordCountApp {
    
        public static void main(String[] args) throws Exception {
    
            // 设置系统“环境变量”; 用于hadoop程序读取配置
            System.setProperty("HADOOP_USER_NAME", "hadoop");
    
            // 系统配置
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS","hdfs://192.168.10.188:8020");
    
            // 创建一个Job
            Job job = Job.getInstance(configuration);
    
            // 设置Job对应的参数:主类
            job.setJarByClass(WordCountApp.class);
    
            // 设置Job对应的参数:设置自定义的Mapper和Reducer处理类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            // 设置Job对应参数:Mapper输出的key和value类型(泛型中后面两个的类型)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 设置Job对应的参数:Reduce输出key和value的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 获取hdfs句柄:如果目录已经存在,则先删除,否则会报错!
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.10.188:8020"), configuration, "hadoop");
            Path outputPath = new Path("/wordcount/output");
            if (fileSystem.exists(outputPath)) {
                fileSystem.delete(outputPath, true);
            }
    
            // 设置Job对应的参数:作业输入和输出的路径
            FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
            FileOutputFormat.setOutputPath(job, outputPath);
    
            // 提交job
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : -1);
        }
    }
    

    4). Combiner操作


    Combiner操作.png
    1. Combiner肯定是用了反射的技术
    2. Combiner操作之前应该也执行了类似shuffle的操作,拿到转换成key以及对应的value迭代器,处理后交给上下文,然后再shuffle,转换成key以及对应的value迭代器丢给下一个Reducer处理
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.net.URI;
    
    /**
     * @author Liucheng
     * @since 2019-11-14
     */
    public class WordCountCombinerApp {
    
        public static void main(String[] args) throws Exception {
    
            // 设置系统“环境变量”; 用于hadoop程序读取配置
            System.setProperty("HADOOP_USER_NAME", "hadoop");
    
            // 系统配置
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS","hdfs://192.168.10.188:8020");
    
            // 创建一个Job
            Job job = Job.getInstance(configuration);
    
            // 设置Job对应的参数:主类
            job.setJarByClass(WordCountCombinerApp.class);
    
            // 添加Combiner的设置@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@2
            job.setCombinerClass(WordCountReducer.class);
    
            // 设置Job对应的参数:设置自定义的Mapper和Reducer处理类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            // 设置Job对应参数:Mapper输出的key和value类型(泛型中后面两个的类型)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 设置Job对应的参数:Reduce输出key和value的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 获取hdfs句柄:如果目录已经存在,则先删除,否则会报错!
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.10.188:8020"), configuration, "hadoop");
            Path outputPath = new Path("/wordcount/output");
            if (fileSystem.exists(outputPath)) {
                fileSystem.delete(outputPath, true);
            }
    
            // 设置Job对应的参数:作业输入和输出的路径
            FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
            FileOutputFormat.setOutputPath(job, outputPath);
    
            // 提交job
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : -1);
        }
    }
    

    相关文章

      网友评论

          本文标题:05. MapReduce

          本文链接:https://www.haomeiwen.com/subject/uaspictx.html