利用MapReduce实现WordCount

作者: howard0103 | 来源:发表于2019-08-22 16:24 被阅读21次

需求

需求很Easy
在整个文件数据里面统计每个单词出现的总次数

MapReduce基本原理

MapReduce的实现流程

MapReduce分为两个阶段,map阶段和reduce阶段,每个阶段都是以键值对的形式作为输入和输出。
所以我们需要实现一个map函数和reduce函数,然后提供一个main函数,在main函数里面进行作业控制和文件输入输出。

具体实现

  • 准备好数据,格式如下
hadoop apache spark
hadoop linux linux spark
hadoop apache hadoop spark
linux linux spark spark
  • 把准备好的数据上传到hdfs上
    hadoop dfs -mkdir -p /wordcount/input //在hdfs上创建wordcount/input目录
    hadoop dfs -put /root/data/test.txt /wordcount/input //将test.txt文件上传到hdfs上
    上传成功后,可以通过浏览器访问ip:50070查看数据

  • 创建Maven工程,并在pom.xml中配置好对应的依赖包

  • 创建WordCountMapper类

    //mapreduce计算框架,把每一条数据,以键值对形式存储
    //KEYIN  输入KEY的类型
    //VALUEIN  输入VALUE的类型
    //KEYOUT  传递给下一个阶段的key类型
    //VALUEOUT 传递给下一个阶段的value类型
    public class WordCountMapper
            extends Mapper<LongWritable, Text, Text, LongWritable> {
        private  LongWritable mapValue = new LongWritable();
        private  Text mapKey = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //以空格键进行分割
            String[] split = value.toString().split(" ");
            for (String tmp:split) {
                mapKey.set(tmp);
                mapValue.set(1);
                //把数据传输到下一个阶段
                context.write(mapKey, mapValue);
            }
        }
    }

经过map处理后的数据变成

(hadoop,1)
(apache,1)
(spark,1)
(hadoop,1)
  ...
  • 创建WordCountReducer类
    //经过shuffle之后,到了reduce阶段,所有key的value都会被传递到values
    public class WordCountReducer extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        private  LongWritable reduceValue = new LongWritable();
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long total = 0;
            for (LongWritable tmp:values) {
                total += tmp.get();
            }
            reduceValue.set(total);
            context.write(key, reduceValue);
        }
    }

经过reduce处理后的数据变成

(apache,[1,1])
(hadoop,[1,1,1,1])
(linux,[1,1,1,1])
(spark,[1,1,1,1,1])
  ...
  • 创建WordCountRunner类,提供一个main函数
public class WordCountRunner {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //注册任务
        Configuration conf = new Configuration();
        //hdfs地址
        conf.set("fs.defaultFS", "hdfs://howard:9000");
        //配置Job
        Job job = Job.getInstance(conf);
        //设置任务名称,可以在后台监控(yarn资源管理器)
        job.setJobName("wordcount");
        //设置job所在的主类
        job.setJarByClass(WordCountDemo.class);
        //input
        Path path = new Path("/wordcount/input");
        FileInputFormat.addInputPath(job, path);
        //配置Map
        //map所在的类
        job.setMapperClass(WordCountMapper.class);
        //map阶段  key输出类型
        job.setMapOutputKeyClass(Text.class);
        //map阶段  Value输出类型
        job.setMapOutputValueClass(LongWritable.class);

        //shuffle是系统执行的无需配置

        //配置reduce
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //output
        Path output = new Path("/wordcount/output");
        FileOutputFormat.setOutputPath(job, output);

        //执行任务
        job.waitForCompletion(true);
    }
}
  • 运行WordCountRunner,执行结束后。通过浏览器,访问ip:50070去查看输出的结果吧。
    输出结果如下:
    apache,2
    hadoop,4
    linux,4
    spark,5

MapReduce为什么处理效率低下

其它方面暂且不论,处理大数据最重要的一个指标就是效率。MapReduce处理数据的效率是很低下的,很多企业是需要实时看到数据结果的,显然MapReduce是无法满足企业需求的。那么MapReduce为什么效率低下呢,其中有个最重要的原因,就是MapReduce中间有个shuffle的环节,这个环节会对数据进行全局排序,这个非常的耗时。
那么问题来了,为什么MapReduce不去掉shuffle呢?这样不就ok了嘛。
因为shuffle阶段是在为reduce做准备,要保证相同的key在同一个节点上。如果不进行shuffle,那么reduce阶段,聚合每一个key就会去遍历所有的节点。

还有一个重要的原因,就是MapReduce处理数据时,会把中间结果数据保存到磁盘中去。这样做虽然减少了对内存的消耗,但同时会进行大量的io操作,大大降低了处理效率。

总结

代码不难,重要的是要把原理搞清楚。
如果一些地方理解的不透彻,有可能就会出现处理效率低下,容灾性差,网络带宽资源浪费,甚至out of memory等
学习任何技术最重要是学习它的思想和原理,很多架构都是基于最基础的架构进行扩展和优化的。不同的架构之间,也可以相互的借鉴和学习。这样同样会浪费大量的时间。

如果有任何问题,或者有什么想法,随时联系我,大家一起交流,共同进步。
我的邮箱 344185723@qq.com

相关文章

网友评论

    本文标题:利用MapReduce实现WordCount

    本文链接:https://www.haomeiwen.com/subject/lepnsctx.html