好程序员大数据培训分享之mapreduce概述,mapreduce:分布式并行离线计算框架,是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;
与HDFS解决问题的原理类似,HDFS是将大的文件切分成若干小文件,然后将它们分别存储到集群中各个主机中。
同样原理,mapreduce是将一个复杂的运算切分成若个子运算,然后将它们分别交给集群中各个主机,由各个主机并行运算。
1.1 mapreduce产生的背景
海量数据在单机上处理因为硬件资源限制,无法胜任。
而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。
引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
1.2 mapreduce编程模型
一种分布式计算模型。
MapReduce将这个并行计算过程抽象到两个函数。
Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度并行。
Reduce(化简 归约):对一个列表的元素进行合并。
一个简单的MapReduce程序只需要指定map()、reduce()、input和output,剩下的事由框架完成。
Mapreduce的几个关键名词
Job :用户的每一个计算请求称为一个作业。
Task:每一个作业,都需要拆分开了,交由多个主机来完成,拆分出来的执行单位就是任务。
Task又分为如下三种类型的任务:
Map:负责map阶段的整个数据处理流程
Reduce:负责reduce阶段的整个数据处理流程
MRAppMaster:负责整个程序的过程调度及状态协调
1.4 mapreduce程序运行流程
具体流程说明:
一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程
maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
– 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对。
– 将输入KV(k是文件的行号,v是文件一行的数据)对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存。
– 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)
Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储
1.5 编写MapReduce程序
• 基于MapReduce 计算模型编写分布式并行程序非常简单,程序员的主要编码工作就是实现Map 和Reduce函数。
• 其它的并行编程中的种种复杂问题,如分布式存储,工作调度,负载平衡,容错处理,网络通信等,均由YARN框架负责处理。
• MapReduce中,map和reduce函数遵循如下常规格式:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
• Mapper的接口:
protectedvoid map(KEY key, VALUE value, Context context)
throwsIOException, InterruptedException {
}
• Reduce的接口:
protectedvoid reduce(KEY key, Iterable values,
Context context)throwsIOException, InterruptedException {
}
• Mapreduce程序代码基本结构
maprecue实例开发
2.1 编程步骤
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
Mapper的输入数据是KV对的形式(KV的类型可自定义)
Mapper的输出数据是KV对的形式(KV的类型可自定义)
Mapper中的业务逻辑写在map()方法中
map()方法(maptask进程)对每一个<K,V>调用一次
Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
Reducer的业务逻辑写在reduce()方法中
Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
用户自定义的Mapper和Reducer都要继承各自的父类
整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
2.2 经典的wordcount程序编写
需求:有一批文件(规模为TB级或者PB级),如何统计这些文件中所有单词出现次数
如有三个文件,文件名是qfcourse.txt、qfstu.txt 和 qf_teacher
qf_course.txt内容:
php java linux
bigdata VR
C C++ java web
linux shell
qf_stu.txt内容:
tom jim lucy
lily sally
andy
tom jim sally
qf_teacher内容:
jerry Lucy tom
jim
方案
– 分别统计每个文件中单词出现次数 - map()
– 累加不同文件中同一个单词出现次数 - reduce()
实现代码
– 创建一个简单的maven项目
– 添加hadoop client依赖的jar,pom.xml主要内容如下:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
– 编写代码
– 自定义一个mapper类
importjava.io.IOException;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Mapper;
/**
*Maper里面的泛型的四个类型从左到右依次是:
*
*LongWritable KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,类似于行号但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
*Text VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
*
*Text KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text
*IntWritable VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable
*/
publicclassWordcountMapperextendsMapper{
/**
*map阶段的业务逻辑就写在自定义的map()方法中
*maptask会对每一行输入数据调用一次我们自定义的map()方法
*/
@Override
protectedvoid map(LongWritable key, Text value, Context context)throwsIOException, InterruptedException {
//将maptask传给我们的一行的文本内容先转换成String
String line = value.toString();
//根据空格将这一行切分成单词
String[] words = line.split(" ");
/**
*将单词输出为<单词,1>
*如<lily,1><lucy,1><c,1><c++,1><tom,1>
*/
for(String word:words){
//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
context.write(newText(word),newIntWritable(1));
}
}
}
– 自定义一个reduce类
importjava.io.IOException;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Reducer;
/**
*Reducer里面的泛型的四个类型从左到右依次是:
*Text KEYIN:对应mapper输出的KEYOUT
*IntWritable VALUEIN:对应mapper输出的VALUEOUT
*
*KEYOUT,是单词
*VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型,是总次数
*/
publicclassWordcountReducerextendsReducer{
/**
*<tom,1>
*<tom,1>
*<linux,1>
*<banana,1>
*<banana,1>
*<banana,1>
*入参key,是一组相同单词kv对的key
*values是若干相同key的value集合
*如<tom,[1,1]><linux,[1]><banana,[1,1,1]>
*/
@Override
protectedvoid reduce(Text key, Iterable values, Context context)throwsIOException, InterruptedException {
int count=0;//累加单词的出现的次数
for(IntWritable value:values){
count += value.get();
}
context.write(key,newIntWritable(count));
}
}
– 编写一个Driver类
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*相当于一个yarn集群的客户端
*需要在此封装我们的mr程序的相关运行参数,指定jar包
*最后提交给yarn
*/
publicclassWordcountDriver {
/**
*该类是运行在hadoop客户端的,main一运行,yarn客户端就启动起来了,与yarn服务器端通信
*yarn服务器端负责启动mapreduce程序并使用WordcountMapper和WordcountReducer类
*/
publicstatic void main(String[] args)throwsException {
//此代码需要两个输入参数 第一个参数支持要处理的源文件;第二个参数是处理结果的输出路径
if(args ==null|| args.length == 0) {
args =newString[2];
//路径都是 hdfs系统的文件路径
args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";
args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
}
/**
*什么也不设置时,如果在安装了hadoop的机器上运行时,自动读取
*/home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
*文件放入Configuration中
*/
Configuration conf =newConfiguration();
Job job = Job.getInstance(conf);
//指定本程序的jar包所在的本地路径
job.setJarByClass(WordcountDriver.class);
//指定本业务job要使用的mapper业务类
job.setMapperClass(WordcountMapper.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定本业务job要使用的Reducer业务类
job.setReducerClass(WordcountReducer.class);
//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job,newPath(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job,newPath(args[1]));
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
wordcount处理过程
将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,下图所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows/Linux环境不同)。
将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,下图所示。
得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。下图所示。
Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,下图所示。
网友评论