美文网首页
WordCount小结

WordCount小结

作者: ebf9089931f8 | 来源:发表于2018-01-29 21:37 被阅读9次

一. 代码如下

public class CombinerWordCountApp {

    public static final int VALUE = 1;
    public static final String WORD_CONNT = "wordConnt";

    /**
     * 封装了mapper和reduceer的方法
     *
     * @param agrs
     */
    public static void main(String[] agrs) throws IOException, ClassNotFoundException, InterruptedException {

        //1.创建配置
        Configuration conf = new Configuration();

        //如果输出目录已经存在,那么将其删除掉
        Path path = new Path(agrs[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);
            System.out.println("输出目录是存在的,但是已经被删掉了");
        }

        //2.创建job
        Job job = Job.getInstance(conf, WORD_CONNT);

        //3.绑定job需要执行的类
        job.setJarByClass(CombinerWordCountApp.class);

        //4.设置job输入路径
        FileInputFormat.setInputPaths(job, new Path(agrs[0]));

        //5.设置job输出路径
        FileOutputFormat.setOutputPath(job, new Path(agrs[1]));

        //增加combine方式,在本地先进行一次reduce操作,减少网络数据传输量
        job.setCombinerClass(MyReducer.class);

        //6.设置job对应的map类及map的输入和输出参数类型
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //7.设置job对应的reduce类及reduce的输入和输出参数类型
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //8.提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] lineText = value.toString().split(" ");

            for (String text : lineText) {
                context.write(new Text(text), new LongWritable(VALUE));
            }

        }


    }

    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (LongWritable value : values) {
                sum++;
            }
            context.write(key, new LongWritable(sum));
        }
    }

}

二.小结

使用MapReduce实现WordCount,代码可分为3个部分

  1. map阶段
    1.1 map功能需要实现Mapper类,一般有setup/map/cleanup三个方法可以重写,setup只在map开始前调用一次,类似于Android中Activity的onCreate方法,可用于做初始化
    1.2 cleanup在map结束后调用一次,类似于Android中Activity的onDestory方法,可用于销毁资源
    1.3 map阶段具体的逻辑需要在map实现,map方法在每从前一阶段传过来一个键值对时,都会调用一次,这个方法回调了4个参数,参数类型是实现Mapper类时需要确定的泛型,第一个参数时从前面阶段传过来的map中的键类型,一般是LongWrite类型,表示这一行字符串的第一位字符在整个文档中的索引位置,第二个参数是从前面split阶段传过来的map的值的类型,一般是Text类型,表示需要处理的文档中的一行文字,第三个参数指的是输出阶段的键的类型,第四个参数值的是map输出阶段值的类型
  2. reduce阶段
    2.1 跟map类似,不同的是reduce方法里面的第二个参数,据我理解,是map阶段传过来的值有可能是一个数组,如果map阶段采用了combine的方式,那么会将相同的键的对应的值放到一个数组里传过来,可以减少数据传输量
  3. job阶段
    大概分为3个阶段
  • 创建job
    • 创建job需要Configuration对象,hdfs的很多属性可以在这里配置
  • 定义job的属性
    • job需要执行的类
      • job的文件输入路径
      • job的文件结果输出路径
      • job执行的mapper类
      • job的map输出键类型
      • job的map输出值类型
      • job的reducer类
      • job的reduce输出键类型
      • job的recude的输出值类型
  • 启动job

相关文章

网友评论

      本文标题:WordCount小结

      本文链接:https://www.haomeiwen.com/subject/daufzxtx.html