美文网首页非技术型IT女民工Hadoop程序员
MapReduce工作机制——Word Count实例(一)

MapReduce工作机制——Word Count实例(一)

作者: ccc66e39fb8e | 来源:发表于2017-07-14 21:10 被阅读100次

    MapReduce工作机制——Word Count实例(一)

    MapReduce的思想是分布式计算,也就是分而治之,并行计算提高速度。

    编程思想

    首先,要将数据抽象为键值对的形式,map函数输入键值对,处理后,产生新的键值对作为中间结果输出。接着,MapReduce框架自动将中间结果按键做聚合处理,发给reduce函数处理。最后,reduce函数以键和对应的值的集合作为输入,处理后,产生另一系列键值对作为最终输出。后面会结合实例介绍整个过程。

    运行环境

    先不考虑采用YARN的情况,那个时候MapReduce的运行环境就是YARN,此处我们讨论的是上一代环境。

    TaskTracker

    slave的角色,负责汇报心跳和执行命令。一个集群有多个TaskTracker,但一个节点只有一个,TaskTracker和DataNode运行在同一节点。

    JobTracker

    master的角色,负责任务调度和集群资源监控,不参与计算。根据TaskTracker周期性发来的心跳信息,考虑TaskTracker的资源剩余量、作业优先级等等,为其分配合适的任务。

    Word Count实例

    环境

    • Java 1.7
    • Hadoop 2.7
    • Maven 3.3
    • Intellij IDEA 2016.3
    • Windows 10

    题主在集成开发环境下写了Word Count程序,配置的pom.xml如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>WordCount</groupId>
        <artifactId>Hadoop</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.7.0</version>
            </dependency>
    
        </dependencies>
    
    </project>
    

    编码

    1. Mapper类

      Mapper类的4个泛型分别代表:map函数输入键值对的键的类,map函数输入键值对的值的类,map函数输出键值对的键的类,map函数输出键值对的值的类。

      map函数部分,key是LongWritable类型,表示该行;value是Text类型,表示行的内容;Context类的write(Text key, IntWritable value)将中间结果输出。

      package com.hellohadoop;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      import java.util.StringTokenizer;
      
      /**
       * Created by duyue on 2017/7/13.
       */
      public class TokenizerMapper  extends Mapper<LongWritable, Text, Text, IntWritable> {
      
          // 直接把单词的个数设置成1, 认为出现了1次
          private final static IntWritable one = new IntWritable(1);
          private Text word = new Text();
      
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 每行文本拆分成单个单词
              StringTokenizer itr = new StringTokenizer(value.toString());
              while (itr.hasMoreTokens()) {
                  word.set(itr.nextToken());
                  // 每个单词(忽略重复)的个数都为1
                  // 即,出现两次"good"会写入两次"good",而不会认为"good"出现了2次
                  context.write(word, one);
              }
          }
      }
      
    2. Reducer类

      Reducer类的4个泛型表示:reduce函数输入键值对的键的类,reduce函数输入键值对的值的类(与map函数输出对应),reduce函数输出键值对的键的类,reduce函数输出键值对的值的类。

      reduce函数部分:接收到的参数形如:<key, List<value>>,因为map函数把key值相同(同一单词)的所有value都发送给reduce函数,统计后输出结果。

      package com.hellohadoop;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      
      import java.io.IOException;
      import java.util.Iterator;
      
      /**
       * Created by duyue on 2017/7/13.
       */
      public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
      
          private IntWritable result = new IntWritable();
      
          public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              int sum = 0;
              for (IntWritable val : values) {
                  sum += val.get();
              }
              result.set(sum);
              context.write(key, result);
          }
      }
      
    3. 编写main函数

      package com.hellohadoop;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      import java.io.IOException;
      
      /**
       * Created by duyue on 2017/7/13.
       */
      public class WordCount {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration();
              Job job = new Job(conf, "word count");
              job.setJarByClass(WordCount.class);
              job.setMapperClass(TokenizerMapper.class);
              job.setReducerClass(IntSumReducer.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              FileInputFormat.addInputPath(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
      

    运行程序

    此处主要依赖于之前Maven依赖的包,为了成功显示日志文件,需要在resources包中添加log4j.properties文件,位置如下图:

    log4j.properties配置.PNG

    文件配置:

    log4j.rootLogger=debug, stdout, R
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    
    log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
    
    log4j.appender.R=org.apache.log4j.RollingFileAppender
    log4j.appender.R.File=firestorm.log
    
    log4j.appender.R.MaxFileSize=100KB
    log4j.appender.R.MaxBackupIndex=1
    
    log4j.appender.R.layout=org.apache.log4j.PatternLayout
    log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
    
    log4j.logger.com.codefutures=DEBUG
    

    配置Configuration如图:

    Configuration配置.png

    自己要创建input文件夹,并将在Project Structure中设置为Excluded类型。在input文件夹下创建需要统计单词数的文件,位置如下图:

    Input文件夹位置

    题主统计的是莎士比亚有名的十四行诗的Sonnet 18,运行程序后:

    运行后的状态图

    其中part-r-00000中保存了统计结果,图太长,截了一部分:

    统计结果

    以上就是Word Count实例在Idea下运行的情况

    下期预告:MapReduce编程涉及到的API

    相关文章

      网友评论

        本文标题:MapReduce工作机制——Word Count实例(一)

        本文链接:https://www.haomeiwen.com/subject/neiohxtx.html