InputFormat数据输入
切片与MapTask并行度决定机制
1)问题引出
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?那些因素影响了MapTask并行度?
2)MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
- 一个Job的Map阶段并行度由客户端提交job时的切片数决定
- 每一个Split切片分配一个MapTask并行实例处理
- 默认情况下,切片大小=BlockSize
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
本地默认块大小是32M
JOB提交流程源码
1建立连接
- 创建提交Job的代理
- 判断本地环境还是yarn集群运行环境
2提交job
- 创建给集群提交数据的stag路径
- 获取jonid,并创建job路径
- 拷贝jar包到集群
- 计算切片,生成切片规划文件
- 向stag路径写配置文件
- 提交job,返回提交状态
FileInputFormat切片源码解析(input.getSplits(job))
1)程序先找到你数据存储的目录
2)开始遍历处理(规划切片)目录下的每一个文件
3)遍历第一个文件ss.txt
3.1)获取文件大小fs.sizeof(ss.txt)
3.2)计算切片大小
3.3)默认情况下,切片大小=blockSize
3.4)开始切,形成第一个切片(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
3.5)将切片信息写到一个切片规划文件中
3.6)整个切片的核心过程在getSplit()方法中完成
3.7)InputSlplit只记录了切片的元数据信息,比如起始位置,长度以及所在节点列表等
4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。
FileInputFormat切片机制
1)简单的按照文件的内容长度进行切片
2)切片大小,默认等于Block大小
3)切片时不考虑数据集整体,而是诸葛针对每一个文件单独切片
源码计算切片大小的公式:
Math.max(minSize,Math.min(MaxSize,BlockSize))
默认情况下,切片大小等于块大小blockSize
CombineTextInputFormat切片机制
框架默认的TextInputFormat切片挤汁时对任务按文件规划切片,不光文件多小,都会是一个单独的切片,都会交给一个MapTask,如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
1)应用场景
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
2)虚拟存储切片最大值设置
4M,虚拟存储切片最大值的设置最好根据实际的小文件大小情况来设置具体的值。
3)切片过程
a)判断虚拟储存的文件大小是否大于SetMaxInputSize值,大于等于则单独形成一个切片
b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片
suffle机制
shuffle机制
map方法之后,reduce方法之前的数据处理过程称之为shuffle。
默认分区时根据key的hashcode对MapTask个数决定的
Partition分区
1)如果ReduceTask的数量》getPartition的结果数,则会多产生几个空的输出文件par-r-oooxx
2)如果1<ReduceTask的数量《getPartition的结果数,则有一部分分区数据无处安放,会Exception
3)如果reduceTask的数量=1,则不管MapTask端出书多少个分区文件,最终结果都会交给这一个ReduceTask,最终也就只会产生一个文件part-r-0000
4)分区好必须从零开始,逐一累加
排序
排序是MapReduce框架中最重要的操作之一
MapTask和ReduceTask均会对数据按照key进行排序。该操作是Hadoop的默认行为。任何应用程序中峰数据均会被排序,而不管逻辑是否需要
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,在对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上虽有文件进行归并排序
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
1)部分排序
MaoReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构
3)辅助排序
在reduce端对key进行分组。应用于,在接受的key为bean对象时,想让一个或几个字段相同(全部字段比较会不相同)的key进入到同一reduce方法时,可以采用分组排序。
4)二次排序
在自定义排序过程中,入股compareTo中的判断条件为两个即为二次排序
Combiner合并
1)Combiner是MR程序中Mapper和Reducer之外的一种组件
2)Combiner组件的父类就是Reducer
3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行
Reducer的接受全局所有的Mapper的输出结果
4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输流量
5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv能够跟Reducer的输入kv类型要对应起来
使用的前提是不能影响最后的业务逻辑
OutputFormat数据输出
OutputFormat 接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面介绍几种常见的OutputFormat实现类。
-
OutputFormat实现类
-
默认输出格式TextOutpitFormat
-
自定义OutputFormat
-
输出数据到MySQL/HBase/Es等存储框架中
-
自定义步骤
1 自定义一个雷继承FileOutputFormat
2 重写RecordWriter,具体改写输出数据的方法write()
-
MapReduce内核源码解析
MapTask工作机制
- read阶段:mapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个kv
- map阶段:用户自己逻辑阶段
- collect阶段:环形缓冲区,分区排序
- 溢写阶段:达到80%后,溢写到磁盘
- merge阶段:对溢写文件排序
ReduceTask工作机制
- copy阶段:ReduceTask从哥哥MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
- sort阶段:在远程拷贝数据的同时,ReduceTask启动两个后台县城对内存和磁盘的数据进行合并,以防内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按照key进行狙击的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于哥哥MapTask已经实现了对自己的处理结果进行了局部排序,因此,reduceTask只需对所有数据进行一次归并排序即可。
- reduce阶段:reduce()函数将计算结果写到HDFS上。
ReduceTask并行度决定机制
1)设置ReduceTask并行度
ReduceTask的并行度同样影响整个job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,reduceTask数量的决定是可以直接手动设置
默认是1
job.setNumReduceTask(4);
2)注意事项
- ReduceTask=0,表示没有reduce阶段,输出文件个数和Map个数一致
- reduceTask默认值就是1,所以输出文件个数为1个
- 如果数据分布不均匀,就有可能在reduce阶段发生数据倾斜
- reduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全部汇总结果,就只能由一个reducetask
- 具体多少reduceTask,需要根据集群性能而定
- 如果分区数不是1,但是reduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断reduceNum个数是否大于1,不大于1肯定不执行。
MapTask & ReduceTask源码解析
Join多种应用
Reduce Join
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新建的标志作为value,最后进行输出。
Reduce端的阶段:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(Map结算已经打标)分开,最后进行合并就OK了。
Map Join
1)DistributedCacheDriver缓存文件
- 加载缓存数据 job.addCache(new URI("file://usr/pd.txt"))
- Map端join的逻辑不需要reduce阶段,设置reduceTask数量为0。job.setNumReduceTasks(0);
2)读取缓存文件
-
setup()方法中
1 获取缓存的文件
2 循环读取缓存文件一行
3 切割
4 缓存数据到集合
5 关流
数据清洗(ETL)
“ETL”,是英文Extract-Transform——load的缩写,用来描述将数据从来源端经过抽取、转换、加载至目的端。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清洗掉不符合用户要求的数据。清理古城往往需要运行Mapper程序,不需要reduce程序
MapReduce开发总结
1)输入数据接口:ImputFormat
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
(3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率
2)逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map、setup、cleanup
3)partition分区
(1)有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;
key.hashCode()&Integer.MAXVALUE%numReducers
(2)如果业务上有特别的需求,可以自定义分区
4)Comparable排序
(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的CompareTo方法
(2)部分排序:对最终输出的每一个文件进行内部排序
(3)全排序:对所有数据进行排序,通常只有一个reduce
(4)二次排序:排序的条件有两个,WritableCompare接口
5)Combiner
前提:不影响最终的业务逻辑(求平均值不行,求和可以)
提前聚合map ==》解决数据倾斜的一个方法
6)Reducer
用户的业务逻辑
setup reduce clearup
7)OutputFormat
默认时TextOutputFormat,按行输出到文件
自定输出
网友评论