在大数据领域,MapReduce的计算模式,至今来说仍然具有典型的代表意义,以Hadoop、Spark为首的大数据技术框架,依然以MapReduce的计算思想为主流。MapReduce计算涉及到一个重要的概念,就是分片,今天的大数据开发学习分享,我们就主要来讲讲MapReduce分片的问题。
MapReduce计算,具体来说,就是分为"Map–Sort–Combine–Shuffle–Reduce"几个阶段,而分片主要是在Map阶段。
首先是Map端,整个Mapreduce过程是从InputFormat类开始的,可以在客户端指定该类的实现方式(如:TextInputFormat),他提供了两个主要的方法:getSplit和getRecordReader。
getSplit用来对数据进行分片,getRecordReader提供一个迭代器用来读取每个split的每条记录。InputFormat决定了分片的大小,例如文本文件默认的大小是hdfs(由hdfs.blocksize指定)中block的大小,对于gzip压缩的文件则分片的大小是整个文件大大小(因为gzip压缩不可切割)。每个Map处理一个split,在大多数情况下分片的大小一般是hdfs中block的大小即128M。
Map执行split中从RecordReader返回的每个键值对,并利用Context对象输出。Map输出后的每条记录的处理类可以使用参数Mapreduce.job.Map.output.collector.class来指定,该参数的默认值为org.apache.hadoop.Mapred.MapTask$MapOutputBuffer,也是目前hadoop唯一的实现。
Map函数的输出首先由Partitioner的getPartition函数来处理。利用key和reduce的数量(即partition的数量)计算出该键值对所属的分区。getPartition的默认实现如下:
public class HashPartitioner<K,V>extends Partitioner<K,V>{
public int getPartition(K key,V value,int numReduceTasks){
return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks;
}
}
分区之后,Map输出和分区号被写入内存中的环形缓冲区,缓冲区的大小由参数Mapreduce.task.io.sort.mb来控制(默认100M)。当该缓冲区内容大小达到一定的比例(由Mapreduce.Map.sort.spill.percent来控制,默认是0.8),会有一个单独的线程把数据溢写(Spilling)到磁盘。因为溢写在单独的线程中执行,所以写缓存和溢写动作可以同时进行,当写缓存速度大于溢写速度且缓存区已满的时候,写缓存线程会阻塞,直到溢写动作使缓存区有空余空间。
溢写动作是在执行Map的本地结点完成的,写入本地文件系统。写入的目录名称可以由参数Mapreduce.job.local.dir(Mapreduce本地临时文件夹)指定。在溢写之前还有两个动作到做,首先根据分区号进行排序,然后分区内部根据键进行排序(快速排序)。在排序结束后Combiner操作开始执行,Combiner操作可以减少写入磁盘的数据量。
每个溢写的文件都有一个索引文件,用来记录每个partition的开始和结束位置。索引文件存储在内存中,由参数Mapreduce.task.index.cache.limit.bytes来指定该内存的大小,默认1M,该内存满后后续的溢写文件的索引会一起被写入磁盘。
当Map和最后一次溢写结束后,溢写线程会终止。开始进行溢写文件的合并。在合并阶段所有的溢写文件会被合并为一个文件。每次合并操作合并的文件个数由参数Mapreduce.task.io.sort.factor指定,当溢写的文件个数大于该参数指定值时,合并会进行多次,最终生成一个文件。在合并文件的时候如果文件个数大于等于参数min.num.spills.for.combine指定的值(默认3)写磁盘前会进行combine操作。
至此,MapTask已经准备好了包含Map所有输出的一个输出文件和一个索引文件,方便ReduceTask获取对应分区的数据。
关于大数据开发学习,MapReduce分片,以上就为大家做了详细的介绍了。MapReduce的计算模式,包括后来的Spark其实也是同样延续的是MapReduce的计算思想,搞懂内部的分片机制,是很有必要的。
网友评论