目录
input split
map
shuffle
reduce
架构设计
一个单词统计的例子引入
单词统计.png
input split
在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组
输入分片(input split)往往和hdfs的block(块)关系很密切,假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split),
换句话说我们如果在map计算前做输入分片调整,例如合并小文件,那么就不会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。
map
就是编写好的map函数,因此map函数效率相对好控制,而且一般map操作都是本地化操作也就是在数据存储节点上进行
shuffle
Shuffler.pngShuffle.png
-
map在做输出时候会在内存里开启一个环形内存缓冲区,这个缓冲区专门用来输出的,默认100MB,当数据达到一定阈值就会把内容写到磁盘上,这个过程叫spill,这个阈值一般是0.8,也就是80MB,另外的20%内存可以继续写入要写进磁盘的数据,写入磁盘和写入内存操作是互不干扰的*,如果缓存区被撑满了,那么map就会阻塞写入内存的操作,让写入磁盘操作完成后再继续执行写入内存操作。
-
Partitioner,将map输出数据到本地磁盘过程中进行分区操作,目的是将reduce对应起来,一个Partitioner对应一个reduce作业,同时还对其进行对key值排序sort。简单理解就是按照统计结果按照条件输入到不同文件当中(分区)。比如将手机号按照 135 137 187开头的分别放到一个独立的文件当中 ,其他的放到一个文件夹。
-
如果我们定义了combiner函数,那么排序前还会执行combiner操作。combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner主要是在map计算出中间文件前做一个简单的合并重复key值的操作。
例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源。
但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。 -
合并merge,对于spill产生的许多小文件进行分区排序合并成大文件,存放在Map Task运行的机器的本地磁盘,临时缓存job结束会删除。
-
拷贝copy,将Map Task运行的机器上,copy数据到Reduce Task机器上。
-
合并merge,将从不同Map Task机器上的copy出来的数据进行合并排序。
-
分组group,将相同的Key的value放在一起
reduce
和map函数一样也是程序员编写的,最终结果是存储在hdfs上的。
MR1.0 运行架构
MR1.0 运行架构.pngMapReduce1 的工作机制中,角色主要包括客户端, Jobtracker,Tasktracker
Jobtracker 主要是协调作业的运行
Tasktracker 是负责运行作业划分之后的任务
JobTracker
- 核心,主,单点
- 调度所有作业
- 监控整个集群的资源负载
Tasktracker
- 从,自身节点资源管理
- 和JobTracker 汇报资源,获取Task
MR1.0弊端
- JobTracker 负载过重,存在单点故障
- 不同框架对资源不能进行全局管理
Client
- 以作业为单位
- 规划作业计算分布
- 提交作业资源到HDFS
- 最终提交作业到JobTracker
MapReduce 过程(了解):
-
首先是由客户端向 Jobtracker 请求一个新的作业,Jobtracker 会检查作业的 输出路径是否存在。若存在则抛出异常。若不存在的话,Jobtracker 会向客户端返回 job 相关资源的提交路径以及 jobID。
-
接下来就是客户端会将 job 所需的资源(jar 文件,配置文件)交到共享文件系统。并告知 Jobtracker 已将 job 复制到共享文件系统,准备执行。
-
Jobtracker 将提交的 Job 放入内部的任务队列,由作业调度器进行调度,并进行初始化(包括创建一个表示正在运行作业的容器,用于封装任务和记录信息)
-
之后 jobtracker 的作业调度器从共享文件系统获取客户端计算好的输入切 片,以创建任务运行列表
-
Tasktracker 通过心跳与 Jobtracker 保持通信,报告自己的状态,以及是否准 备好运行一个 task,若准备好 ,Jobtracker 则通过一定的调度算法从 Jobtracker 中获得一个 task 分配给 Tasktracker。
-
Tasktracker 在共享文件系统中获得任务相关资源,实现 jar 本地化,并创建 响应的文件夹以及一个 taskrunner 运行该任务。
-
taskrunnr 会启动一个新的 JVM,在新启动的 JVM 中运行任务。
进度与状态的更新:有一个独立的线程向 tasktracker 报告当前任务状态。同时 Tasktracker 每隔 5s 向 Jobtracker 通过心跳发送状态。JobTracker 将这些更新 合并,发送给客户端。
网友评论