Map-Reduce处理过程(分析气象数据的map-reduce过程)
MapReduce初探
1,调用标准的input类(默认TextInputFormat),将输入数据转化成标准的样式,成为map的输入。
2,map过程,提取出需要的数据,这里提取出年份、温度这两个数据,输出成key-value对(标准样式)
3,系统自动进行shuffle过程。把key相同的行聚合在一起,输出为key-数据集合(或者数组),成为reduce的输入
4,reduce过程,对数据集合处理,求得温度最大值。
5,reduce输出,调用output类(默认TextOutputFormat)写入HDFS文件。
一般完成Map-Reduce最基本需要3段程序:Map、Reduce、作业调度器。
Map-Reduce编程模型
MapReduce初探
分片的问题:
1,分片非常接近物理块边界
2,通常每个分片对应一个task
3,通过分片实现计算数据本地化,减少数据的网络传输
4,分片包含的数据未必都在本地,所以计算任务可能存在少量的数据网络传输
MapReduce初探
Mapper
1,Map-reduce的思想就是“分而治之”
2,Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”执行
3,“简单的任务”有几个含义:1 数据或计算规模相对于原任务要大大缩小;2 就近计算,即会被分配到存放了所需数据的节点进行计算;3 这些小任务可以并行计算,彼此间几乎没有依赖关系
Reducer
1,对map阶段的结果进行汇总
2,Reducer的数目由mapred-site.xml配置文件里的项目mapred.reduce.tasks决定。缺省值为1,用户可以覆盖之
性能调优
1,究竟需要多少个reducer?
2,输入:大文件优于小文件
3,减少网络传输:压缩map的输出。(combiner)
4,优化每个节点能运行的任务数:mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum (缺省值均为2)
Map-Reduce的工作机制
MapReduce初探
调度机制
缺省为先入先出作业队列调度
支持公平调度器
支持容量调度器
任务执行优化
1,推测式执行:即如果jobtracker发现有拖后腿的任务,会再启动一个相同的备份任务,然后哪个先执行完就会kill去另外一个。因此在监控网页上经常能看到正常执行完的作业有被kill掉的任务
2,推测式执行缺省打开,但如果是代码问题,并不能解决问题,而且会使集群更慢,通过在mapred-site.xml配置文件中设置mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution可为map任务或reduce任务开启或关闭推测式执行
3,重用JVM,可以省去启动新的JVM消耗的时间,在mapred-site.xml配置文件中设置mapred.job.reuse.jvm.num.tasks设置单个JVM上运行的最大任务数(1,>1或-1表示没有限制)
4,忽略模式,任务在读取数据失败2次后,会把数据位置告诉jobtracker,后者重新启动该任务并且在遇到所记录的坏数据时直接跳过(缺省关闭,用SkipBadRecord方法打开)
错误处理机制:硬件故障
1,硬件故障是指jobtracker故障或tasktracker故障
2,jobtracker是单点,若发生故障目前hadoop还无法处理,唯有选择最牢靠的硬件作为jobtracker
3,Jobtracker通过心跳(周期1分钟)信号了解tasktracker是否发生故障或负载过于严重
4,Jobtracker将从任务节点列表中移除发生故障的tasktracker
5,如果故障节点在执行map任务并且尚未完成,jobtracker会要求其它节点重新执行此map任务
6,如果故障节点在执行reduce任务并且尚未完成,jobtracker会要求其它节点继续执行尚未完成的reduce任务
错误处理机制:任务失败
1,由于代码缺陷或进程崩溃引起任务失败
2,Jvm自动退出,向tasktracker父进程发送方错误信息,错误信息也会写入到日志
3,Tasktracker监听程序会发现进程退出,或进程很久没有更新信息送回,将任务标记为失败
4,标记失败任务后,任务计数器减去1以便接受新任务,并通过心跳信号告诉jobtracker任务失败的信息
5,Jobtrack获悉任务失败后,将把该任务重新放入调度队列,重新分配出去再执行
6,如果一个任务失败超过4次(可以设置),将不会再被执行,同时作业也宣布失败
一些技巧
1,选择reducer的个数(权威指南中文版第二版第195页)
默认为1,可以在代码中JobConf.setNumReducerTaks来设置reducer的个数。
reducer的最优个数与集群中可用的reducer任务槽数相关,总槽数由集群中节点数与每个节点的任务槽数相乘得到。
假如节点有8个处理器,计划在每个处理器上跑2个进程,则可将mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum的值分别设置为7(考虑还有datanode和tasktracker这2个进程,这2项值不可以设置为8)
2,Hadoop流中的key, value和分隔符(权威指南中文版第二版第197页)
3,控制分片的大小(权威指南中文版第二版第202页)
4,避免切分(权威指南中文版第二版第205页)
5,原始数据使用分隔符区分key-value时的输入(权威指南中文版第二版第211页)
KeyValueTextInputFormat,默认使用制表符来划分key-value,可以使用job.set("key.value.separator.in.input.line",",")来改变key-value分隔符
6,XML文件的输入(权威指南中文版第二版第213页)
7, 二进制数据输入(权威指南中文版第二版第213页)
8, Mapper输出多种不同类型的value
9,统计作业运行信息——计数器使用(权威指南中文版第二版第225页)
Map-Reduce开发场景
1,数据去重(Hadoop实战104页)
利用系统自身的shuffle过程,自动对key合并,在reduce中只需把key提取出来即可。
2,排序(Hadoop实战107页)
同上,key已经就是排过序的
3,单表关联(自连接)(Hadoop实战109页)
利用连接列作为key,实现连接列相等匹配的目的,每个Map产生2行输出,这2行中有连接条件,用数字1标识左表,数字2标识右表,经过shuffle过程,key成为连接条件。
MapReduce初探
网友评论