4.MapReduce模板开发

作者: 奉先 | 来源:发表于2017-01-04 11:18 被阅读110次

    1. mapreduce程序介绍:

    mapreduce是Google提出的一种计算框架。框架处理key、value对;数据处理过程中数据流向格式。

    mapreduce程序开发包含三个部分:

    (1)map:分布到很多节点上去执行;

    (2)reduce:合并map的输出结果

    (3)Job: job调度配置。

    Input -->  Map  -->  Reduce --> Output

    2. wordcount思路:

    假设对下边这个文件wc.txt,进行词频统计(分隔符是制表符):

    SF LUNA LION

    LUNA NEC LION

    SF SF POM

    将该文件上传到 HDFS 上:

    $cd $HADOOP_HOME

    $bin/hdfs dfs -put /home/natty.ma/bigdata/hadoop/files/wordcountfiles.txt /user/natty.ma/hdfsapi/

    $bin/hdfs dfs -text /user/natty.ma/hdfsapi/wordcountfiles.txt

    (1)在Input阶段,MapReduce内部的FileInputFormat会将文件处理成“<偏移量,本行内容>”这种key value的格式。

    那么Input阶段会生成下边的结果:

    <13,SF LUNA LION> 

    <26,LUNA NEC LION>

    <37,SF SF POM>

    (2)在Map阶段,输入是第(1)步的输出。将每行记录内容按照制表符拆分出单词,并把拆分出的单词作为key,value给1。

    这样得到的key value对是:

    <SF,1>

    <LUNA,1>

    <LION,1>

    <LUNA,1>

    ...

    (3)在Reduce阶段,合并每个key的多个值。

    在MR内部,会将相同的Map阶段的key 的value值合并成list(Iterable)。得到下面的key value对:

    <LION,(1,1)>

    <LUNA,(1,1)>

    <NEC,(1)>

    <POM,(1)>

    <SF,(1,1,1)>

    上边的集合作为Reduce阶段的输入,在reduce阶段,将value值的list的值合并(sum)。Reduce的输出是:

    <LION,2>

    <LUNA,2>

    <NEC,1>

    <POM,1>

    <SF,3>

    3.程序实现:

    下面是按照规范实现的示例程序:

    WordCountMapReduce.java

    上边程序实现了WordCount的逻辑。主要包含4个部分:创建 Mapper类、创建Reducer类、配置Job、提交Job

    4.提炼模板:

    MapReduce也是“八股文”变成模式,最终要的是分析MapReduce要实现的业务逻辑,从而确定Map和Reduce阶段的业务逻辑。

    确定Map阶段的输入输出、Reduce阶段的输入和输出。确定Map和Reduce的参数(LongWritable、IntWritable、Text等)。

    提炼了模板需要修改以下地方:

    (1)Mapper和Reducer的类名(根据业务实际含义)

    (2)Mapper类的输入、输出类型(输入的key、value;输出的key、value)。

    (3)Reducer类的输入、输出类型(输入的key、value;输出的key、value)。

    (4)Mapper类重写的map方法输入类型。map()方法的前2个参数是map的输入key、value。输出key、value写入context中。

    (5)Reducer类重写的reduce方法的输入类型。reduce()方法的前2个参数是reduce的输入key、value。输出key、value写入context中。

    (6)job设置mapper和reducer类 及其key、value类型。

    下面是整理的模板类:

    ModuleWordCountMapReduce

    5. WebPV事例程序研究:

    该事例使用一个样本文件,以TAB键分割数据。数据字典:

    WebPV数据文件数据字典

    计数每一个省份代码(provinceId)有效的URL数量。首先,清洗每条记录,保留合法记录(例如,url不能为空,必须要有provinceId字段,provinceId字段需要是数字等条件)。清洗出所有合法的记录后,统计每个省份代码出现的次数,也就是该省份PV数量了。

    下面WebPV的MapReduce程序实现了上述逻辑:

    WebPVMapReduce.java

    6. Shuffle 阶段:

    Shuffle描述的是从Map输出到Reduce输入的中间过程。Shuffle包含Map端ShuffleReduce端Shuffle。这中间的过程是:

    Input  -->  map()阶段 -->  map阶段output --> map端Shuffle --> Reduce端Shuffle --> Reduce阶段Input 

    MapReduce中的shuffle和sort

    6.1 Map端Shuffle处理:

    map()函数产生的map的Output不是直接写到磁盘,而是在内存中缓存并进行预排序。每个map任务有环形内存缓冲来保存map输出。这个内存空间默认是100MB(通过mapreduce.task.io.sort.mb来配置),但是如果这个缓冲内存占满了,就需要往磁盘上写了,这个过程叫做spill。但是一般情况下,达到一个阈值就会开始spill(溢出),默认是80%,通过mapreduce.map.sort.spill.percent 来配置。 写在运行map的节点的本地,目录有下边属性配置:mapreduce.cluster.local.dir。

    备注:(当缓冲区快满时,会写到磁盘生成一个临时文件。当这个map task结束后,会对磁盘上这个map task产生的所有临时文件进行排序、合并,并生成最终的输出文件,等待reduce task来拉取数据。)

    Partition、Sort、Combine、Merge、Compress:

    在写入磁盘前,会将数据分为不同的分区,每个分区对应一个reducer(也就是说分配到同一个partition的数据会由同一个reducer来处理)。在同一个分区内,数据会按照key来排序,如果有combiner函数的话,也会执行在sort的输出结果上。Combiner函数会先对map结果进行合并(例如sum),这样使得临时文件中的数据大大减少,临时文件也就会变小,进而减少在集群中数据转移量,减少I/O,提高效率。由于每次缓冲器达到阈值时,都会生成一个新的临时文件,所以很可能有很多临时文件,这些文件最终会合并成一个单独的分区的、排序的output文件。

    生成的spill文件数量如果大于3个,会再次调用combiner。如果只有1个或者2个spill文件,那么combiner就不再值得使用来降低map output文件的大小,所以combiner就不再执行了。

    在map output写入磁盘前,做压缩可以优化性能。文件压缩后变小,写入磁盘更快,文件在拷贝到其他reducer的节点的时间也会缩短。默认是不做压缩的。可以修改属性 :mapreduce.map.output.compress

    6.2 Reduce端Shuffle处理:

    1.Copy阶段:

    Map阶段的mapoutput放在执行map任务的机器的本地目录(local dir)上。如果mapoutput分了多个partition。假如,mapoutput输出文件中分区到partition1的数据,就需要拷贝到reducer1执行所在的机器。并且,reducer1需要的文件,会来自多个不同的map tasks(同时也就来自多个节点),这些map tasks完成时间不同,只要map task完成,reducer就会立即copy数据。

    Reduce Task在copy data时,如果数据较小会直接copy在内存中,如果超过阈值,存在磁盘上。当拷贝在磁盘上累积时,后台程序会做merge和sort。(当然,这个是在同一个Reduce Task中做的处理,也肯定是同一个分区的数据)

    2. Sort阶段:

    map outputs拷贝完成后,进入sort阶段。该阶段合并来自不同 map tasks的 map outputs并保持排序。Sort阶段分多轮进行 ,假如有50个map outputs,并且 merge参数设为10(通过mapreduce.task.io.sort.factor 属性设定)。那么会有5个中间临时文件。Merge阶段不会把这些文件合成为一个文件传递给reduce阶段,而是把这5个中间临时文件直接传给reduce,reduce阶段再对这5个中间临时文件执行reduce函数。

    3. Reduce阶段:

    在Reduce阶段,Reduce函数在每个已排序的output文件上的每个key使用,reduce的output直接写到HDFS。该阶段进行group,相同key的value组合在一起 ,例如:<hadoop,(1,1,1)>。

    6.3 mapreduce优化:

    shuffle是mapreduce的核心,优化mapreduce程序也就是优化shuffle阶段,优化一些shuffle阶段使用的参数。在优化时,可以重点优化shuffle过程中的下面几个阶段:

    partition

    sort

    combine

    compress

    group

    7. 自定义数据类型

    Writable接口(org.apache.hadoop.io.Writable):一个实现简单、高效、序列化协议的序列化对象。

    在MapReduce中,value必须实现Writable接口。

    在MapReduce中,key必须实现WritableComparable接口。 WritableComparable接口继承Writable和Comparable<T>。

    下面的类自定义了数据类型 CustomDataWritable。包含5个属性:上下行数据包、上下行流量、电话号码个数计数。

    CustomDataWritable.java

    8. 手机流量案例统计

    手机流量案例(日志文件),数据字典:

    日志文件数据字典

    如上边数据文件的数据字典,统计每一个手机号的 上行数据包总数、下行数据包总数、上行总流量数、下行总流量数。 

    那么其实思路非常清晰,我们筛选出符合条件的数据记录。按照手机号码(msisdn)来sum上边4个指标就可以了。非常简单! 在用mapreduce来实现时,我们需要创建一个“自定义数据类型” , 这个类型包含以下四个属性:

    upPackNum;

    downPackNum;

    upPayLoad;

    downPayLoad;

    这样一简化之后,我们就可以按照key(手机号码),Reduce我们的自定义数据类型就可以了。

    下面程序实现了按手机号统计流量的逻辑(最后一位是该手机号出现的次数):

    PhoneStatisticsMapReduce.java

    相关文章

      网友评论

        本文标题:4.MapReduce模板开发

        本文链接:https://www.haomeiwen.com/subject/yedovttx.html