-
程序执行流程图如下:
-
programmer编写程序,打包
-
JobClient向JobTracker申请可用JobID,JobTracker返回可用ID给JobClient
-
JobClient复制相关资源到HDFS上
◇
程序jar包,作业配置文件*.xml
◇
输入划分信息,比如需要多少个map完成任务
◇
本地文件、第三方依赖的jar包、依赖的归档文件、普通文件 -
资源准备后,JobClient向JobTracker提供Job
-
JobTracker初始化Job
-
JobTracker从HDFS中获取第3步输入的splits
-
TaskTracker不断向JobTracker发送汇报心跳信息,并且返回要执行的任务、完成度
-
TaskTracker从HDFS中获取完成作业需要的resource
-
获得资源后,TaskTracker会开启JVM子进程完成作业
MapReduce端分析
- map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件
- 在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。分区是对数据进行hash,然后对每个分区中的数据进行排序。如果设置了Combiner,将排序后的结果进行Combianer操作,目的是让尽可能少的数据写入到磁盘。
- 当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combiner操作,目的有两个:1、尽量减少每次写入磁盘的数据量;2、尽量减少下一复制阶段网络传输的数据量
- 将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和obTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就OK了。
Shuffle端

- MapReduce里的Shuffle:描述着数据从map task输出到reduce task输入的这段过程。
- 每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘。
- 在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。
- MapReduce提供Partitioner接口,作用就是根据key或value及reduce的数量来决定当前的输出数据最终应该交由哪个reduce task处理。
- 内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为spill,中文可理解为溢写。
- 每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件
Reduce端
- reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接收的数据量相当小,则直接存储在内存中,如果数据量超过了该缓冲区大小的一定比例,则对数据合并后溢写到磁盘中。
- 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件。
- 合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。
- Reducer的输入文件,不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDSF上。
What is MapReduce?
The first is the map operation, takes a set of data and converts it into another set of data, where individual elements are broken down into tuples (key/value pairs). The reduce operation combines those data tuples based on the key and accordingly modifies the value of the key.

Thus the output of the node will be three key, value pairs with three distinct keys and value set to one. The mapping process remains the same in all the nodes. These tuples are then passed to the reduce nodes. A partitioner comes into action which carries out shuffling so that all the tuples with same key are sent to same node.

Important Commands
Usage: hadoop [--configure confdir ] COMMAND
Options | Description |
---|---|
namenode -format |
Formats the DFS filessytem |
secondarynamenode |
Runs a DFS secondary namenode |
namenode |
Runs a DFS namenode |
datanode |
Runs a DFS namenode |
dfsadmin |
Runs a DFS admin client |
mradmin |
Runs a Map-Reduce admin client |
fsck |
Runs a DFS filesystem checking utility |
fs |
Runs a generic filesystem user client |
balancer |
Runs a cluster balancer utility |
oiv |
Applies the offline fsimage viewer to an fsimage |
fetchdt |
Fetches a delegation token from the NameNode. |
jobtracker |
Runs the MapReduce job Tracker node. |
pipes |
Runs a Pipes job |
tasktracker |
Runs a MapReduce task Tracker node. |
historyserver |
Runs job history servers as a standalone daemon. |
job |
Manipulates the MapReduce jobs. |
queue |
Gets information regarding JobQueues. |
version |
Prints the version. |
jar <jar> |
Runs a jar file |
distcp <srcurl> <desturl> |
Copies file or directories recursively. |
archive -archiveName NAME -p |
Creates a hadoop archive. |
daemonlog |
Get/Set the log level for each daemon |
How to Interact With MapReduce Job
**Usage: ** hadoop job [Generic_options]GENERIC_OPTIONS Description
GENERIC_OPTIONS | Description |
---|---|
-submit <job-file> |
Submits the job. |
-status <job-id> |
Prints the map and reduce completion percentage and all job counters. |
-counter <job-id> <group-name> <countername> |
Prints the counter value. |
-kill <job-id> |
Kills the job. |
-events <job-id> <fromevent-#> <#-of-events> |
Prints the events' details received by jobtracker for the given range. |
-history [all] <jobOutputDir> - history < jobOutputDir> |
Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option. |
-list[all] |
Displays all jobs. -list displays only jobs which are yet to complete. |
-kill-task <task-id> |
Kills the task. Killed tasks are NOT counted against failed attempts. |
-fail-task <task-id> |
Fails the task. Failed tasks are counted against failed attempts. |
-set-priority <job-id> <priority> |
Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
网友评论