需求
需求很Easy
在整个文件数据里面统计每个单词出现的总次数
MapReduce基本原理
MapReduce的实现流程
MapReduce分为两个阶段,map阶段和reduce阶段,每个阶段都是以键值对的形式作为输入和输出。
所以我们需要实现一个map函数和reduce函数,然后提供一个main函数,在main函数里面进行作业控制和文件输入输出。
具体实现
- 准备好数据,格式如下
hadoop apache spark
hadoop linux linux spark
hadoop apache hadoop spark
linux linux spark spark
-
把准备好的数据上传到hdfs上
hadoop dfs -mkdir -p /wordcount/input //在hdfs上创建wordcount/input目录
hadoop dfs -put /root/data/test.txt /wordcount/input //将test.txt文件上传到hdfs上
上传成功后,可以通过浏览器访问ip:50070查看数据 -
创建Maven工程,并在pom.xml中配置好对应的依赖包
-
创建WordCountMapper类
//mapreduce计算框架,把每一条数据,以键值对形式存储
//KEYIN 输入KEY的类型
//VALUEIN 输入VALUE的类型
//KEYOUT 传递给下一个阶段的key类型
//VALUEOUT 传递给下一个阶段的value类型
public class WordCountMapper
extends Mapper<LongWritable, Text, Text, LongWritable> {
private LongWritable mapValue = new LongWritable();
private Text mapKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//以空格键进行分割
String[] split = value.toString().split(" ");
for (String tmp:split) {
mapKey.set(tmp);
mapValue.set(1);
//把数据传输到下一个阶段
context.write(mapKey, mapValue);
}
}
}
经过map处理后的数据变成
(hadoop,1)
(apache,1)
(spark,1)
(hadoop,1)
...
- 创建WordCountReducer类
//经过shuffle之后,到了reduce阶段,所有key的value都会被传递到values
public class WordCountReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable reduceValue = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long total = 0;
for (LongWritable tmp:values) {
total += tmp.get();
}
reduceValue.set(total);
context.write(key, reduceValue);
}
}
经过reduce处理后的数据变成
(apache,[1,1])
(hadoop,[1,1,1,1])
(linux,[1,1,1,1])
(spark,[1,1,1,1,1])
...
- 创建WordCountRunner类,提供一个main函数
public class WordCountRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//注册任务
Configuration conf = new Configuration();
//hdfs地址
conf.set("fs.defaultFS", "hdfs://howard:9000");
//配置Job
Job job = Job.getInstance(conf);
//设置任务名称,可以在后台监控(yarn资源管理器)
job.setJobName("wordcount");
//设置job所在的主类
job.setJarByClass(WordCountDemo.class);
//input
Path path = new Path("/wordcount/input");
FileInputFormat.addInputPath(job, path);
//配置Map
//map所在的类
job.setMapperClass(WordCountMapper.class);
//map阶段 key输出类型
job.setMapOutputKeyClass(Text.class);
//map阶段 Value输出类型
job.setMapOutputValueClass(LongWritable.class);
//shuffle是系统执行的无需配置
//配置reduce
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//output
Path output = new Path("/wordcount/output");
FileOutputFormat.setOutputPath(job, output);
//执行任务
job.waitForCompletion(true);
}
}
- 运行WordCountRunner,执行结束后。通过浏览器,访问ip:50070去查看输出的结果吧。
输出结果如下:
apache,2
hadoop,4
linux,4
spark,5
MapReduce为什么处理效率低下
其它方面暂且不论,处理大数据最重要的一个指标就是效率。MapReduce处理数据的效率是很低下的,很多企业是需要实时看到数据结果的,显然MapReduce是无法满足企业需求的。那么MapReduce为什么效率低下呢,其中有个最重要的原因,就是MapReduce中间有个shuffle的环节,这个环节会对数据进行全局排序,这个非常的耗时。
那么问题来了,为什么MapReduce不去掉shuffle呢?这样不就ok了嘛。
因为shuffle阶段是在为reduce做准备,要保证相同的key在同一个节点上。如果不进行shuffle,那么reduce阶段,聚合每一个key就会去遍历所有的节点。
还有一个重要的原因,就是MapReduce处理数据时,会把中间结果数据保存到磁盘中去。这样做虽然减少了对内存的消耗,但同时会进行大量的io操作,大大降低了处理效率。
总结
代码不难,重要的是要把原理搞清楚。
如果一些地方理解的不透彻,有可能就会出现处理效率低下,容灾性差,网络带宽资源浪费,甚至out of memory等
学习任何技术最重要是学习它的思想和原理,很多架构都是基于最基础的架构进行扩展和优化的。不同的架构之间,也可以相互的借鉴和学习。这样同样会浪费大量的时间。
如果有任何问题,或者有什么想法,随时联系我,大家一起交流,共同进步。
我的邮箱 344185723@qq.com
网友评论