MapReduce

作者: BlackChen | 来源:发表于2018-01-22 22:50 被阅读17次

    MapReduce原理与特性

    源自于Google的MapReduce论文
    发表与于2014年12月
    Hadoop MapReduce是Goole MapReduce的克隆版

    是一个批处理计算框架
    一个MapRecduce程序分为Map阶段和Reduce 阶段
    MapReduce特性:

    • 易于编程
    • 良好的扩展性
    • 高容错
    • 适合海量数据离线处理

    MapReduce 常用应用场景

    • 数据统计,比如网站的PV、UV统计
    • 搜索引擎索引
    • 海量数据查找
    • 复杂数据分析算法实现
      聚类算法
      分类算法
      推荐算法

    不擅长场景

    • 实时计算
      能够在毫秒或者秒级内返回结果
    • 流式计算
      输入数据集是静态的,不能动态变化
      自身设计特点决定了数据源必须是静态的
    • DAG计算
      多个作业之间存在依赖关系,后一个应用程序输入为前一个应用程序输出

    MapReduce 编程模型

    MapReduce将作业的整个运行过程分为两个阶段Map阶 段和Reduce阶段

    Map阶段由一定数量的Map Task组成

    • 输入数据格式解析:InputFormat (文件切分,分片,整合为key-value)
    • 输入数据处理: Mapper(对数据进行过滤,切分等)
    • 数据分组:Partitioner(hash(key) 取余 ReduceTask,然后由指定的ReduceTask处理)

    Reduce阶段由一定数量的Reduce Task组成

    • 数据远程拷贝
    • 数据按照key排序
    • 数据处理:Reducer
    • 数据输出格式:OutputFormat

    注意: 粗体是可以编程实现修改的.


    image.png

    InputFormat

    • 文件分片(inputSplit) 方法
      处理跨行问题,
    • 将分片数据解析成key/value对
      默认实现时textInputFormat
      -TextInputFormat
      Key是行在文件中的偏移量,Value是行内容
      如果一行被截断,则读取下一个block的前几个字符

    Split与Block

    • Block
      HDFS中最小数据处理单元
      默认128MB
    • Split
      MapReduce中最小的计算单元
      默认与Block一一对应
    • Block与Split
      Split与Block的对应关系是任意的,可由用户控制

    Partitioner

    Partitioner决定了Map Task输出的每条数据交给哪个
    Reduce Task处理

    默认实现:HashPartitioner

    • Hash(key) mod R
    • R是Reduce Task数目
    • 分区计算结果产生的分区号等于Reduce Task号
    • 允许自定义分区

    编程接口

    Java编程接口组成
    • 旧API:所在java包:org.apache.hadoop.mapred
    • 新API:所在java包:org.apache.hadoop.mapreduce
    新API具有更好的扩展性
    两种编程接口只是暴露给用户的形式不同,内部执行
    引擎是一样的

    创建maven工程,导入pom.xml,编写wordcount
    wordcount

    package class3;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
        static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //拿到一行数据,将输入的序列化数据转换成字符串
                String line = value.toString();
                //将一行数据按照分隔符拆分
                String[] words = line.split("\t");
                //遍历单词数据,输出单词<k,1>
                for (String word : words) {
                    //需要序列化写出
                    context.write(new Text(word), new IntWritable(1));
                }
            }
        }
    
        static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            //reduce方法是针对输入的一组数据,一个key和它的所有value组成一组(k:v1,v2,v3)
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                //定义一个计数器
                int count = 0;
                //遍历一组数据,将key出现次数累加到count
                for (IntWritable value : values) {
                    count += value.get();
                }
                context.write(key, new IntWritable(count));
    
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            String jobName = args[0];
            String inputPath = args[1];
            String outputPath = args[2];
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            //设置作业名称
            job.setJobName(jobName);
            //设置主类
            job.setJarByClass(WordCount.class);
            //设置作业中使用的Mapper和Reducer类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            //设置Mapper阶段的输出key类型和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            //设置reducer阶段的输出key类型和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //设置job的输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path(inputPath));
            FileOutputFormat.setOutputPath(job, new Path(outputPath));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>hadooptest</groupId>
        <artifactId>hadooptest</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                <version>2.7.0</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <executions>
                        <execution>
                            <id>default-compile</id>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                            <configuration>
                                <encoding>UTF-8</encoding>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

    打包生成 jar包


    上传到服务器,然后编写input file--->test.txt,上传到hdfs.

    aaa     asdf
    asdf
    asfe
    adsf
    asdfaf
    sadf    sdf     asf     asd
    helo    hello   hello   hello
    
    [hadoop@hadoop0 ~]$ hadoop fs -ls /wordcount/input/
    Found 1 items
    -rw-r--r--   3 hadoop supergroup         72 2018-01-23 21:07 /wordcount/input/test.txt
    

    执行jar

    hadoop jar hadooptest-1.0-SNAPSHOT.jar class3.WordCount WordCount /wordcount/input/ /wordcount/output
    (包名+类名,类名,输入路径,输出路径)

    执行完成!查看output

    [hadoop@hadoop0 ~]$ hadoop fs -ls /wordcount/output
    Found 2 items
    -rw-r--r--   3 hadoop supergroup          0 2018-01-23 21:09 /wordcount/output/_SUCCESS
    -rw-r--r--   3 hadoop supergroup         76 2018-01-23 21:09 /wordcount/output/part-r-00000
    

    hadoop fs -cat /wordcount/output/part-r-00000

    aaa 1
    adsf    1
    asd 1
    asdf    2
    asdfaf  1
    asf 1
    asfe    1
    hello   3
    helo    1
    sadf    1
    sdf 1
    

    注意:输出路径在提交之前要保证不存在,MapReduce运行完会 自动创建,如果存在输出路径在提交的时候会报文件夹存在的错 误提示

    用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运 行mr程序的客户端)

    1. Mapper的输入数据是KV对的形式
    2. Mapper的输出数据是KV对的形式
    3. Mapper中的业务逻辑写在map()方法中
    4. map()方法对每一个<K,V>调用一次
    5. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV对
    6. Reducer的业务逻辑写在reduce()方法中
    7. Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
    8. 用户自定义的Mapper和Reducer都要继承各自的父类
    9. 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必
      要信息的job对象

    相关文章

      网友评论

          本文标题:MapReduce

          本文链接:https://www.haomeiwen.com/subject/qrvraxtx.html