美文网首页
MapReduce程序WordCount业务实现

MapReduce程序WordCount业务实现

作者: 羋学僧 | 来源:发表于2020-08-03 16:41 被阅读0次

    WordCount的业务逻辑:

    1、mapTask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成一个 key,value 对,比如单词 hello,就转换成 <’hello’,1> 发送给 reduceTask 去汇总
    2、reduceTask 阶段将接受 mapTask 的结果,来做汇总计数


    结果:

    hdfs 1
    hive 1
    flink 1
    hadoop 1
    hello 5
    spark 1

    首先看Mapper:WordMapper.java

    /**
    *  Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    *      KEYIN:是指框架读取到的数据的key的类型,在默认的InputFormat下,读取的key就是一行文本的起始的偏移量。所以key的类型Long
    *      VALUEIN:是指框架读取到的数据的value的类型,在默认的InputFormat下,读取的value就是一行文本。所以value的类型String
    *      KEYOUT:是指用户自定义的逻辑方法返回的数据中key的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的key是单词,所以类型是String
    *      VALUEOUT:是指用户自定义的逻辑方法返回的数据中value的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的value是次数,所以类型是Long
    *
    *      但是,String、Long 是jdk中自带的数据类型,在序列化的时候,效率低。
    *      hadoop为了提高效率,自定义了一套序列化的框架
    *      在hadoop程序当中,如果要进行序列化(写磁盘、网络传输数据),一定要用hadoop实现的序列化的数据类型
    *
    *      Long        ====》 LongWritable
    *      String      ====》 Text
    *      Integer     ====》 IntWritable
    *      Null        ====》 NullWritable
    *
    */
    public class WordMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    
       /**
        *
        * @param key 就是偏移量
        * @param value  一行文本
        * @param context
        * @throws IOException
        * @throws InterruptedException
        */
       @Override
       protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
           //1、单词的切分
           String[] words = value.toString().split(" ");
    
           //2、计数一次,帮单词转换成<hello,1>这样的key value类型对外输出
           for (String word : words) {
               //3、写入到上下文中
               context.write(new Text(word),new LongWritable(1));
           }
    
       }
    }
    

    其次看Reducer:WordReducer.java

    /**
     * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     *     KEYIN:map阶段的一个输出的key
     *     VALUEIN:LongWritable类型的数字
     *     KEYOUT:最终的结果单词,Text类型
     *     VALUEOUT:最终的单词的次数,LongWritable类型
     */
    public class WordReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    
        /**
         *
         * @param key map阶段的一个输出的key
         * @param values  hello <1,1>
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            //1、定义一个统计的变量
            long count = 0;
    
            //2、迭代
            for (LongWritable value : values) {
                count += value.get();
            }
    
            //3、写入到上下文
            context.write(key,new LongWritable(count));
        }
    }
    
    

    再看Driver:JobMain.java

    /**
     * 主类:将map和reduce串接起来,并提供了运行入口
     */
    public class JobMain {
        /**
         * 这个main方法是WordCount程序运行的入口,其中用一个Job类对象来管理程序运行的很多参数:
         * 指定用哪个类作为Mapper阶段的业务逻辑类,指定哪个类作为Reducer阶段的业务逻辑类
         * ......其他的各种需要的参数
         * @param args
         */
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration configuration = new Configuration();
            //一、初始化一个Job对象
            Job job = Job.getInstance(configuration, "wordcount");
    
            //二、设置Job对象的相关信息  里面包含了8个小步骤
            //1、设置输入的路径,让程序找到源文件的位置
            job.setInputFormatClass(TextInputFormat.class);
            //运行本地文件
            //TextInputFormat.addInputPath(job,new Path("D://input/test2.txt"));
            //在服务器运行运行
            TextInputFormat.addInputPath(job,new Path("hdfs://bigdata02:9000/wordcount.txt"));
    
            //2、设置Mapper类,并设置k2 v2
            job.setMapperClass(WordMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
    
            // 3、分区 4、排序 5、局部合并 6、分组 四个步骤,都是Shuffle阶段,使用默认的配置。
    
            //7、设置Reducer类,并设置k3 v3的类型
            job.setReducerClass(WordReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            //8、设置输出的路径
            job.setOutputValueClass(TextOutputFormat.class);
            //运行本地文件
            //TextOutputFormat.setOutputPath(job,new Path("D://word_out"));
            //在服务器运行运行
            TextOutputFormat.setOutputPath(job,new Path("hdfs://bigdata02:9000/word_out"));
    
            //三、等待完成
            boolean b = job.waitForCompletion(true);
            System.out.println(b);
            System.exit(b ? 0 : 1);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:MapReduce程序WordCount业务实现

          本文链接:https://www.haomeiwen.com/subject/waqurktx.html