MapReduce(一)---入门

作者: Coding小聪 | 来源:发表于2019-06-22 22:55 被阅读0次

    1. MapReduce概述

    MapReduce是一个分布式计算的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
    MapReduce核心功能是将用户编写的业务逻辑代码并发地运行在一个Hadoop集群上

    1.1 MapReduce进程

    一个完整的MapReduce程序在运行时有3种进程:

    1. MrAppMaster:负责整个MapReduce程序的调度和状态协调;
    2. MapTask:负责Map阶段的数据处理流程;
    3. ReduceTask:负责Reduce阶段的数据处理流程。

    2. MapReduce编程

    用户需要按照一定的规范来进行MapReduce程序的开发。

    2.1编程规范

    用户编写的MapReduce程序由三个部分组成:Mapper、Reducer和Driver.

    Mapper编写
    1. 用户自定义的Mapper继承特定的Mapper类;
    2. 定义Mapper的输入数据类型(K-V对形式的两个参数,都需要指定);
    3. 定义Mapper的输出数据类型(K-V对形式的两个参数,都需要指定);
    4. 重写map()方法,将业务逻辑写在其中。

    对于每个输入的<K,V>参数,map()方法均会执行一次。

    Reduce编写
    1. 用户自定义Reducer继承特定的Reducer父类;
    2. 定义Reducer的输入数据类型,其也是K-V对,并对应Mapper的输出数据类型;
    3. 重写reduce()方法,将业务逻辑写入其中。

    ReduceTask进程会对每一组相同k的<k,v>输入参数只调用一次reduce()方法。

    Driver编写

    相当于YARN集群的客户端,用于提交MapReduce程序到YARN集群,提交的是一个job对象,该job对象封装了MapReduce程序相关的运行参数。

    2.1 常用数据类型

    Hadoop的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储。

    Hadoop类型 Java类型
    BooleanWritable boolean
    ByteWritable byte
    IntWritable int
    FloatWritable float
    LongWritable long
    DoubleWritable double
    Text String
    MapWritable map
    ArrayWritable array

    自定义MapReduce数据类型

    自定义数据类型有两种方式:

    1. 实现Writable接口
      重写 write()和readFields()方法
    2. 实现WritableComparable接口
      重写 write(),readFields()和compareTo()方法。

    3. MapReduce的优缺点

    优点

    1. MapReduce编程简单。只需要实现一些接口,就可以完成一个分布式计算程序,并分不到大量廉价的PC机器上运行;
    2. 良好的扩展性。当计算资源不足时,可以通过简单的增加机器来扩展计算能力;
    3. 高容错性。MapReduce程序运行在廉价的PC机器上,当其中一台机器宕机了,它可以自动将计算任务转移到另外一个节点上运行,而不需要人工参与;
    4. PB级以上海量数据的计算

    缺点

    1. 不擅长实时计算。MapReduce一般用来做数据的离线处理,它没法像MySQL一样,在毫秒或者秒级别内返回结果;
    2. 不擅长流式计算。流式计算的输入数据是动态的,而MapReduce的输入数据是静态的,不能动态变化;
    3. 不擅长DAG(有向图)计算。DAG计算指的是,多个应用存在依赖关系,后一个应用的输入为前一个应用的输出。使用MapReduce进行此种计算,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下。

    4. MapReduce工作流程


    1)默认HDFS中一个存储块的大小是128M,所以需要将200M拆成128+72;
    2)maptask并行运行,互不影响;
    3)reducetask也是并行运行,互不影响,但是reducetask的开始要依赖于所以的maptask都运行结束。

    5. WordCount案例

    这里实现一个和Hadoop官方提供的wordcount类似功能,需求如下:
    输入:文本文件,文件中包含一些单词。
    输出:各个单词输出的次数。

    对照“2.1编程规范”,我们需要编写Mapper、Reducer、Driver。

    5.1 Mapper

    public class WordcountMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
        Text outKey = new Text();
        IntWritable outValue = new IntWritable(1);
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words =  line.split(" ");
            for (String word : words) {
                outKey.set(word);
                context.write(outKey,outValue);
            }
        }
    }
    

    5.2 Reducer

    public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable outValue = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
    
            outValue.set(sum);
            context.write(key, outValue);
        }
    }
    

    5.3 Driver

    public class WordcountDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 获取配置信息以及封装任务
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            // 设置jar加载路径
            job.setJarByClass(WordcountDriver.class);
    
            // 设置map和reduce类
            job.setMapperClass(WordcountMapper.class);
            job.setReducerClass(WordcountReducer.class);
    
            // 设置map输出kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 设置最终输出kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 设置输入和输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 提交
            job.submit();
        }
    }
    

    需要注意一点:导包的时候很多类在org.apache.hadoop.mapreduce包和org.apache.hadoop.mapred中存在同名的情况,一般导入org.apache.hadoop.mapreduce包。

    5.4 打包运行

    在pom.xml文件中配置maven-assembly-plugin,然后通过mvn install指令对应用进行打包, 最后在target目录中可以看到打好的包

    其中with-dependencies.jar中多了依赖,如果是在集群中运行的话,集群中包含mapreduce运行所需的jar包,所以使用不带依赖的jar包即可。

    将wordcount-1.0-SNAPSHOT.jar上传到hadoop服务器,然后运行hadoop jar即可。

    [hadoop@hadoop01 software]$ hadoop jar wordcount-1.0-SNAPSHOT.jar com.zgc.mapreduce.wordcount.WordcountDriver /usr/hadoop/input /usr/hadoop/output
    

    其中,/usr/hadoop/input是一个hdfs的目录,它下面含有需要统计单词次数的文件。

    执行完成之后,可以通过hdfs dfs -cat /usr/hadoop/output/part-r-00000指令查看统计结果。


    参考

    1. MapReduce的核心编程思想

    相关文章

      网友评论

        本文标题:MapReduce(一)---入门

        本文链接:https://www.haomeiwen.com/subject/wjyvqctx.html