MR的优势
MapReduce与SETI(Search fo Extra—Terrestrial Intelligence)搜索外星智慧 的计划不同之处是,SETI的确招募了全球大量的志愿计算机,不过它所发散出去的task是CPU高密度计算任务,广大志愿者贡献的是CPU周期,而不是网络带宽。例如需要处理存储在HDFS上的log,海量的log统计需要切分任务,第一步要把各个任务所需资源传输到所在的计算节点,这里依赖大量的网络带宽。各个计算节点在maptask阶段完全并行,计算结果输出到HDFS上再进行分桶排序,进入到reduce阶段。
- 有大量数据在计算节点和存储节点直接传送,集群需要大量带宽支撑
- 能够依赖HDFS对大规模数据进行处理
- MR编程简单,逻辑清晰,不用考虑数据分流问题
MR过程
- 一个MR(Job)是客户端需要执行的一个工作单元,包括输入数据,MR程序和配置信息,Hadoop将job分成若干个task执行(map or reduce )task。
- 两个节点控制Job之心过程:
Jobtracker以及一些列tasktracker。Jobtracker负责调度tasktracker上的任务以及收集tasktracker的运行状态信息,如果jobfail,jobtracker负责在其它tasktracker上重启任务。 - 输入分片:
Hadoop将MR输入数据划分成小数据块,称为输入分片,一个大的输入分成了N个分片,如果分片规模越小,那么所有计算节点的负载会趋向均衡(集群中各个计算节点的硬件配置不同,当前所承担计算量也不同)。不过分片过小,分片过程就会占用很高的时间,对于大多数作业,一个合理的分片趋向于HDFS的一个块的大小,默认为64M,这样存储节点进行计算很有好处(不用对HDFS块进行切分,也不会占用集群带宽资源)。 - map将结果写入本地磁盘,reduce结果才会写入HDFS.
-
shuffle:后面标题shuffle有详细介绍
官方定义的shuffle
- 启动mr任务:
Streaming 使用Unix标准输入流作为Hadoop和应用程序之间的接口,也就是说可以用任何语言通过标准输入输出编写MR程序。
目前我用的是streaming,很方便配置各项参数。 - 一个没有用过的东西 Hadoop Pipes
Hadoop Pipes 是Hadoop MR的C++接口名称,并没使用标准输入输出实现MR之间Streaming,Hadoop Pipes 使用socket作为tasktracker与C++版本map函数过着reduce函数的进程间的通道
HDFS
HDFS的设计
- 流式数据访问:一次写入,多次读取。
- 商用硬件: Hadoop并不需要运行在高可靠的硬件上,事实上hadoop节点故障率不低,只不过被设计成可以继续运行,用户不易察觉。
- 数据访问延迟高,对于低延迟要求的访问,不适合存储在HDFS上。
- 一个Hadoop集群上所能存储的文件数量是受制于namenode节点内存的,所以不要存储大量小文件到HDFS上。
- 不修改已经存储的文件
HDFS数据块
- 无力磁盘的数据块大小一般为512B,作为读写数据的最小单位,类似的,HDFS也采取(block)的策略,一般默认存储块大小为64M,HDFS上的文件也被划分为块级大小的分块(chunk),作为独立存储单元,但是与其他文件系统不同的是,HDFS中小于块大小的文件不会占据整个块空间。
- 大数据块的目的:减少寻址开销。
- 分块存储的优势:1、某文件所需存储空间可以大于集群中任意磁盘的容量。2、使用块而不是文件作为存储单元,简化存储子系统设计,例如权限信息,并不需要与块一同存储。3、Hadoop的存储容错系统也得益于HDFS的块级存储,备份时只针对块进行稳定存储,即使有文件损坏也只是块级损坏,只需要恢复块级数据。
NAMENODE AND DATANODE
- NameNode 管理文件系统的命名空间,维护整个文件系统树及整棵树的文件和目录,这些信息以文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。
如果NameNode节点坏掉,那么整个文件系统也随之坏掉,因为我们丢失了所有文件组织信息,所以NameNode的容错非常重要.
1、第一种机制是备份组成文件系统元数据持久状态的文件,这些操作为实时同步的原子操作,一般是讲持久状态写入本地磁盘的同时,写入一个远程挂在的网络文件系统(NFS)
2、另一种机制是运行一个辅助的NameNode,定期编辑日志,合并空间镜像。辅助节点保存数据必然落后于NameNode,主节点失效必然丢失数据,一般把存储在NFS上的元数据回复到辅助节点上作为新的NameNode。 - DataNode同样维护每个文件中各个块所在的数据节点信息,但是并不永久保存,这些信息在系统reboot时有数据节点重建。它是文件系统的工作节点,需要检索并且存储数据块,定期向NameNode所存储的发送数据块列表。
- client代表用户与NameNode 和DataNode交互访问整个文件系统。
client 与HDFS、NameNode、DataNode之间的数据流:
- client通过调用open方法打开希望读取的文件,HDFS通过RPC调用namenode ,namenode返回文件每个块的序列+datanode地址。
- client通过create方法创建新文件,HDFS通过RPC调用namenode,在文件系统命名空间中新建一个文件,namenode检查文件不存在且用户权限符合标准,创建文件并添加记录,否则跑出IOException异常。数据通过DFSoutputstream组织成一个个数据包,组成数据队列,在一组(一般是三个)datanode中通过管道线传送,第一份副本优先选择client所在的datanode,如果client不在本地,那么随机挑选一个存储不太满,不繁忙的datanode。第二份副本会切换机架选取一个datanode,第三份副本与第二副本同机架不同节点。
- distcp应用场景
典型应用场景是在两个HDFS集群之间传送数据
Hadoop的IO操作
- 数据完整性:
数据第一次引入HDFS前计算校验和,在通过不可靠的通道进行传输之后再次计算校验和。其中写数据的client将数据及其校验和发送到一系列datanode组成的管道线,管道线中的最后一个datanode负责验证校验和。
不仅如此,每个datanode也会在后台线程运行一个datablockscanner,定期验证存储在这个datanode上的所有数据块。
-
压缩:
为了节省存储空间,节约带宽,有时会需要对文件进行压缩处理。
压缩格式总结.JPG
可切分的压缩格式包括bzip2 ,预处理过程中被索引过的LZO。
-
codec
e3afd245-0fe3-4d5c-b643-8558fc87c175.JPG
某实例
- 可切分:
可切分就是说可以从压缩刘数据的任意位置读取数据。
假如一个压缩文件为1G,在64M一块的格式下分为16块。如果是可切分的压缩格式,这16块可以作为16个MAP的输入,反之,虽然存储了16个数据块,这16块有只能以一定顺序进入一个MAP。
序列化
- 序列化定义:序列化(serialization)是指将结构化对象转化为字节流一般在网络上传输或者写入磁盘永久存储的过程。反序列化(deserialization)是指将字节流转回结构化对象的逆过程。
- 应用场景:进程间通信和永久存储 。 Hadoop节点进程间通信就是通过RPC(remote procedure call)实现的。
*Hadoop序列化格式:使用自己的序列化格式writable,它够紧凑,速度快(可谓Hadoop的核心),但是不易被java意外的语言进行拓展或者使用。
很明显,writable接口有两个方法,一个DataOutput 将结构化数据写到二进制流,一个DataInput方法读取二进制流数据。
HIVE VS HBASE
- HIVE 数据仓库存储,一次存储,多次访问。可用partition分桶存储,一般适用于全盘扫描的需求,例如分桶统计数据需要访问某桶内全部数据,如果想要查询某跳数据,显然不是Hive擅长的工作,因为其没有索引。
- HBASE Key,Value形式按列存储
HBase作为面向列的数据库运行在HDFS之上,HDFS缺乏随即读写操作,HBase正是为此而出现。HBase以Google BigTable为蓝本,以键值对的形式存储。项目的目标就是快速在主机内数十亿行数据中定位所需的数据并访问它。
HBase是一个数据库,一个NoSql的数据库,像其他数据库一样提供随即读写功能,Hadoop不能满足实时需要,HBase正可以满足。如果你需要实时访问一些数据,就把它存入HBase。
你可以用Hadoop作为静态数据仓库,HBase作为数据存储,放那些进行一些操作会改变的数据。
调度框架背景
在目前版本以及0。20版本系列中,mapred.job.tracker决定了执行MR程序的方式,如果被设置为local,则在单个JVM上运行整个作业,小数据集单机测试,如果被设置成主机+端口号,运行器将作业交给该地址的jobtracker。hadoop2.0引入了一种新的执行机制,称为MR2,建立在一个名为YARN的系统上。目前执行框架通过mapreduce.framework.name属性进行设置,local表示本地作业运行器,classic表示经典MR框架MR1,使用一个jobtracker和多个tasktracker,yarn表示新的框架。
经典MR框架组成
- 客户端:
1、提交MR作业,检查输出目录是否存在,计算作业的输入分片(因为输入路径已经给定,根据文件大小计算分片个数)
2、将作业所需资源(作业JAR文件,配置文件和计算所得的输入分片)复制到一个以作业ID明明的目录下jobtracker的文件系统中,告知jobtracker作业准备执行(通过调用Jobtracker的submitJob()图中步骤4)
- Jobtracker:
协调作业运行。jobtracker是一个java应用程序,主类是JobTracker。
1、接到submitjob后,将调用放入内部队列,交由job scheduler调度,并对其封装初始化
2、为了创建任务运行列表,job scheduler首先从HDFS中获取已经计算好的输入分片,为每个分片创建任务 - tasktracker:
运行作业划分后的任务,是一个java应用程序,主类是tasktracker。
1、运行一个简单的讯黄来定期发送heartbeat给jobtracker,一为了表明tracker仍然存活,二充当消息通道。
2、对于map任务和reduce任务,tasktracker有固定的数量和任务槽,二者独立设置,准确数量有tasktracker核心数和内存大小决定。默认调度器在处理reduce任务槽之前会填满map任务槽。
3、为了选择reduce任务,jobtracker并不考虑数据本地化,直接从reduce任务列表中选取下一个,而对于map任务,则优先考虑tasktracker的网络位置,选取一个最近的输入分片文件,理想情况任务是数据本地化的(不过也可能是机架本地化)。
4、现在tasktracker已经被分配了一个任务,第一步,从HDFS中把作业JAR文件与应用程序所需的全部文件复制到tasktracker所在的文件系统,实现文件本地化。第二步,为任务新建一个本地工作目录,解压JAR文件。第三部,创建taskrunner实例运行该任务
5、 taskrunner启动一个新的JVM来运行每个任务,以便用户定义的mr函数的任何软件问题都不会影响到tasktracker(例如导致崩溃或者挂起),单杀不同任务之间重用JVM还是可能的。
6、Streaming任务使用标准输入输出流与进程进行通信
![Uploading 7ccf8386-6c9d-4bf6-bf5d-624f8419ae94_830239.JPG . . .]
-
HDFS
经典MR1作业启动过程
YARN(MR2)
对于节点数量超出4000的大型集群,MR1系统开始面临拓展性的瓶颈,因此YARN(Yet Another Resource Nefotiator)
- YARN将jobtracker只能划分为多个独立的实体,改善classicMR面临拓展瓶颈的问题。Jobtracker负责作业调度和任务进度见识,追踪任务、重启失败或者国漫的任务和进行任务等级等等。
Yarn将这梁总角色划分为两个独立的守护进程:管理集群上资源使用的资源管理器(RM)和管理集群上运行任务生命周期的应用管理器(AM)。
基本思路是: AM与RM协商集群上的计算资源:容器(每个容器都有特定的内存上限),在这些容器上运行特定的应用程序进程。
容器由集群节点上运行的节点管理器(NodeManager)监视,确保应用程序使用的资源不会超过分配给它的资源。 -
每个实例(这里指MR Job)有一个专用的应用master
![Uploading aa5479ab-1ee4-4eec-899c-f685eb09335e_962818.JPG . . .]
- 一次任务的执行过程:
作业提交:MR2中作业提交使用与MR1相同的API,MR2实现了ClientProtocol,当mapreduce.framework.name=yarn 启动时,从RM而不是JOBTRACKER获得Job_Id,然后计算输入分片,在集群上产生分片,并将作业资源(作业JAR,配置信息,分片信息辅助到HDFS),最后通过调用RM上的submitApplication()方法提交作业
作业初始化:RM接收到submitApplication(),将消息传递给调度器(scheduler),调度器分配一个容器,RM在节点管理器的管理下在容器中启动应用程序的master进程。
MR作业的AppMaster是一个java应用程序,它对作业进行初始化,通过创建多个簿记对象以保持对作业进度的跟踪,因为它接受来自任务的进度和完成报告(步骤6)。接下来,他接受来自共享文件系统的已经被计算好的输入分片(步骤7)然后AM决定如何运行构成MR作业的各个任务,如果作业很小,就选择在与它同一个JVM上运行任务。判定在一个节点上顺序运行它们的开销小于在新容器中分配运行的开销的大,就会发生这种情况,称为uberized,或者作为uber任务运行。MR1中不会在单tasktracker上运行小作业(例如10个mapper+1 reducer)。
任务分配
如果任务不适合作为uber任务运行,那么AM为所有Map Reduce任务向RM请求容器(步骤8),并且指定内存需求,默认1G,通过mapreduce.map.memory.mb 和mapreduce.reduce.memory.mb设置。
在内存分配方面,MR2对比与MR1,可以自行申请1G的整数倍的内存,而MR1只能是每个MR任务占用固定的MR槽,导致小任务内存利用率低。
任务执行
一旦RM为任务分配了容器,AM就通过节点管理器通信来启动容器(步骤9ab),在它运行之前,首先将任务所需的资源本地化(包括作业配置JAR文件和所有DFS上的文件),最后运行Map or Reduce任务。
进度和状态更新
Yarn运行时,任务每3s通过umbilical API想AM回报进度和状态(包括计数器),相比之下MR1 通过TT 到JT实现进度更新。客户端每秒查询一次AM接受进度更新,通常向用户显示
在MR1中,作业跟踪器的WEB UI展示运行作业列表及其进度,在YARN中,RM的Web UI 展示了正在运行的APP以及连接到对应的AM,每个AM展示MR作业进度等进一步细节。
MR2系统中状态更新的传播
作业完成
出了向AM 查询进度之外,客户端每5s通过Job的waitForCompletion()检查作业是否完成。
作业完成后AM和任务容器清理其工作状态,OutPutCommitter的作业清理方法会被调用,作业历史服务器保存作业信息以便需要时查询。
失败
作业的失败处理目前还不是很能影响到自身,所以优先将时间用在其它地方,此处留空
shuffle
- 更详细的shuffle
官方shuffle定义图
详细的MAP过程
- 在Map阶段 MAP的输出首先存放在内存中一般默认有环状100M的缓冲区,当缓冲区内数据使用量达到80%,那么开始写入tasknode本地磁盘。写入磁盘过程:1、首先将数据最终要传入的reducer分桶(partition),然后在每个分区中,按照键值进行排序(sort排序,combiner如果有的话,对map输出进一步处理,减少向磁盘写入的数据量)
- Reduce阶段,假如有M 个MAP任务, N个reduce任务, 每当有一个MAP任务报告完成, reduce任务就会立即访问MAP产生的数据,并且按Key值拉取对应的partition 数据,所以说,reduce任务在最初阶段就是在不断的拉取所有Map任务的输出。 reduce task node在不断拉取Map输出的同时,不断的merge本地的map数据以达到有序(如果文件大,那么将存在磁盘山个,否则可以存储在reduce node 的本地内存中),当最后一个Map任务结束之时,所有的reduce任务将完成最后一次本地数据merge,将所有数据排序后生成一个文件,然后开始执行用户自定义的reducer程序。
网友评论