MapReduce 编写 wordcount (二)

作者: 小飞牛_666 | 来源:发表于2018-10-08 00:29 被阅读7次
    今天我们开始对 mapreduce 进行操作,实现一个功能:编写 mapreduce 实现单词的统计功能,并使用 jar 在 yarn 上统计单词数。闲话少讲,接下来我们开始对 mapreduce 的编写。
    为方便,我们先创建一个类:MyWordCountUp并使其继承自 Configured 以及实现 Tool 接口,结构如下:
    public class MyWordCountUp extends Configured implements Tool{
        public int run(String[] args) throws Exception{
        }
    }
    
    一、在类中创建一个继承自 WordCountMapper 的类,用于切割数据并实现相关的逻辑(这里为方便是以内部类的形式创建的,看个人的需求):
    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
            private Text mapKey = new Text();
            private IntWritable mapValue  = new IntWritable(1); //初始值赋值为1
    
            //对我们独立元素中的每一个元素进行并行计算操作的函数
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                //以空格切割成字符串数组
                String[] lines = line.split(" ");
                for(String str : lines){
                    mapKey.set(str); //设值
                    context.write(mapKey, mapValue);
                }
            }
        }
    
    
    二、同理,创建一个继承自 Reducer 的内部类,用于统计单词的个数即聚合:
    public static class WrodCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    
            private IntWritable reduceValue = new IntWritable();
    
            //对我们独立元素中的数据进行合并
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
                List<IntWritable> list = Lists.newArrayList(values);
                int sum = 0;
                for(IntWritable reduceValue : list){
                    sum += reduceValue.get(); //累加
                }
    
                reduceValue.set(sum);//设值
                context.write(key, reduceValue);
            }
        }
    
    
    三、在run 方法中操作 driver (组装所有的过程到job):
    //run
        public int run(String[] args) throws Exception {
    
            //driver
            //1) 获取 conf
            Configuration configuration = this.getConf(); //由于类已继承 Configured ,因此直接调用其方法
    
            //2) 创建 job
            Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
            job.setJarByClass(this.getClass());
    
            //3.1) 输入
            Path path = new Path(args[0]);
            FileInputFormat.addInputPath(job, path);
    
            //3.2 设置 map
            job.setMapperClass(WordCountMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 5 -> 在这里可以设置:分区、排序、分组、设置 reduce 的个数等
    
            //3.3 设置 reduce
            job.setReducerClass(WrodCountReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //3.4 输出
            Path outPath = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outPath);
    
            //4. 提交
            boolean sucess = job.waitForCompletion(true);
            return sucess ? 0 : 1;
        }
    
    
    四、主方法的实现:
    public static void main(String[] args) {
    
            //当打包成 jar 之前 记得注释掉
    //        args = new String[]{
    //                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/wordcount.txt",
    //                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output"
    //        };
    
            Configuration configuration = new Configuration();
    
            try {
                //先判断文件夹是否存在
                Path fileOutPath = new Path(args[1]);
                FileSystem fileSystem = FileSystem.get(configuration);
    
                if(fileSystem.exists(fileOutPath)){
                    fileSystem.delete(fileOutPath, true); //删除
                }
    
                int status = ToolRunner.run(configuration, new MyWordCountUp(), args);
                System.exit(status);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    

    五、到这一步我们的程序已经弄好了,接下来我们可以先在本地运行一下,注意main方法中路径要去掉注释,运行没有问题我们再打包成 jar ,(温馨提示:要测试必须要先启动集群服务:1. sbin/start-dfs.sh 2.sbin/start-yarn.sh):
    六、我们将打包好的 jar 放在系统的 jars 文件夹下,然后进入 hadoop 目录下执行如下命令进行统计:
    bin/hadoop jar /opt/jars/workcount.jar com.lcy.mr.MyWordCountUp hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/wordcount.txt hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output6
    效果如下:

    image.png
    打开网页浏览效果:http://bigdata-pro01.lcy.com:50070
    image.png
    接下来我们再使用命令进行查询:
    bin/hdfs dfs -text /user/hdfs/output6/part*
    

    效果如下:


    image.png

    最后注意的是,此项目的配置是建立在第一篇项目之上的,即 pom.xml 已经配置好,还有,统计的数据文件已经在 hdfs 上。。。

    相关文章

      网友评论

        本文标题:MapReduce 编写 wordcount (二)

        本文链接:https://www.haomeiwen.com/subject/hjzraftx.html