美文网首页
Hadoop-3.1.3(六)MapReduce 机制

Hadoop-3.1.3(六)MapReduce 机制

作者: _大叔_ | 来源:发表于2021-07-23 09:49 被阅读0次

MapReduce 简介

MapReduce分布式并行计算框架是一种可用于数据处理的编程模型,可运行由个中语言编写的MapReduce程序:java、Ruby、Python、R、C++等语言。它用于处理超大规模数据的计算,同时具有可并行计算的特性,因此可以将大规模的数据分析任务交给任何一个拥有足够多机器的集群。并采用函数式编程的思想,在各函数之间串行计算(Map执行完毕,才会开始执行Reduce任务)。

hadoop1.x job运行机制

在讲 YARN 后会说 hadoop2.0 job运行机制

  1. run job 由客户端来完成,底层有一个JobClient类来做具体实现。run job会做如下几件事
  • 做job环境信息的收集,比如各个组件类,输出KV类型,检测是否合法。
  • 检测输入路径的合法性,以及输出结果路径的合法性。
    如果检测未通过,直接报错返回,不会做后续的job提交动作。
  1. run job检测通过,JobClient 会向 JobTracker为当前job申请jobid,jobid是全局唯一的,用于标识一个Job。

  2. copy job resource阶段,JobClient把Job的运算资源上传到HDFS。路径为 /tmp/hadoop-yarn/staging/用户/* 。运算资源包括如下:

  • jar包
  • 文件的切片信息
  • job.xml 整个job的环境参数
  1. 当jobClient将运算资源上传到HDFS之后,提交job(submit job)

5.6. 初始化job环境信息(init job)以及获取job的切片数量(get split data)目的是获取整个Job的MapTask任务数量和ReduceTask的任务数量。MapTask任务数量=切片数量。ReduceTask的任务数量在代码设置。

  1. TaskTracker 会根据心跳去JobTracker 获取 job任务。TaskTracker 在领取任务时,要满足数据本地话策略(TaskTracker 属于 datanode,datanode存的是 块数据,JobTracker 属于 namenode,拥有块的切片信息,TaskTracker 去领取任务的时候最好领取块相对应的切片信息,可以节省带宽,否则还要去其他 datanode 找属于自己的切片信息)。切块包含的是数据(文件的真是数据),切片包含的是块的描述信息,如:
  • Path 文件所在分布式文件系统路径
  • Start 数据开始位置
  • Length 数据的长度
  1. TaskTracker 去HDFS下载 job(xxx.jar) 的运算资源

9.10. 启动JVM进程

输入输出机制

以下是写一个 单词统计的 job 的输入和输出


  1. MapTask 读取文件根据代码逻辑进行输出
  2. ReduceTask 读取 MapTask的输出做为输入,会得到 一个迭代器,迭代器包含了相同单词的 value(最后图中的 hello 1 1 1 1 1 1),迭代器中包含6个元素,每个元素的值都是1.

分区机制

分区需要在代码中设置分几个区,并且可以控制分区条件

合并组件(Combiner机制)

我们可以看到 合并组件 提前把MapTask的的输出做为输入合并后再输出,引入Combiner的作用就是为了降低ReduceTask的负载。

MapTask工作机制

注意
  1. Spill 过程不一定会发生,当MapTask输出的数据流<溢写缓冲大小*溢写阈值
  2. 当发生了Spill过程,最后溢写缓冲区残留的数据会 flush 到最后生成的Spill文件中
  3. Spill理论上是80MB,但是需要考虑序列化的因素
  4. 不能凭一个MapTask处理的切片数据大小,来衡量输出数据的大小,这得取决于业务
  5. 有一个MapTask,就会有一个对应的溢写缓冲区
  6. 溢写缓冲区本质上就是一个字节数组,即内存中一块连续的地址空间
  7. 溢写缓冲区又叫环写(环形)缓冲区,可以重复利用同一块地址空间。
  8. Spill 因为是环形缓冲区,设置80%的写入,是为了不去阻塞后续的写入。
  9. Merge 过程不一定发生,原因是没有发送Spill 或 发生Spill但只输出一个文件。
优化
  1. 适当调整 溢写缓冲 区大小,建议范围250M~350M
  2. 加入Combine(合并组件),会在缓冲区和Merge过程进行合并,减少Spill 溢写次数,减少IO次数,减少网络数据传输。Merge过程Combine不一定发生,当Spill 文件 < 3
  3. 对最后的结果文件进行压缩

ReduceTask 工作机制

ReduceTask 会把 MapTask 输出的文件按分区读取再合并排序,该分区和 ReduceTask 的分区是两回事,MapTask 是对文件内容分区,ReduceTask 是对输出结果到不同的文件分区。
ReduceTask 在合并过程中,如果有大量的分区文件,按照一定的数量(可设置)合并,直到合并到少量文件到ReduceTask 工作。

优化
  1. Fetch 线程数可调整,默认是5个。
  2. ReduceTask 启动阈值 5%,ReduceTask 不是等所有的 MapTask 完成才工作,而是根据 5% 的比例。

Hadoop 常见参数调优

hdfs-site.xml

namenode是否允许被格式化,默认为true,生产系统要设置 false,组织任何格式化操作再一个运行的DFS上。格式指令:hadoop namenode -format

dfs.namenode.support.allow.format=true

datamode的心跳间隔,默认3秒,在集群网络状态不好的情况下,建议调大

dfs.heartbeat.interval=3

切块大小,默认是128MB,必须得是1024(page size)的整数倍。

dfs.blocksize=134217728

edits和fsimage文件合并周期阈值,默认1小时

dfs.namenode.checkpoint.period=3600

文件流缓存大小。需要是硬件page大小的整数倍。在读写操作时,数据缓存大小,必须是1024整数倍

dfs.stream-buffer-size=4096

maperd-site.xml

任务内部排序缓冲区大小,默认时100MB,调大能减少Spill溢写次数,减少磁盘IO,建议250~400MB

mapreduce.task.io.sort.mb=100

Map阶段溢写文件的阈值。不建议修改此值

mapreduce.map.sort.spill.percent=0.8

ReduceTask启动的并发copy数据的线程数,建议尽可能等于或接近Map任务数量,达到并行抓取的效果

mapreduce.reduce.shuffle.parallelcopies=5

当Map任务数量完成率再5%时,Reduce任务启动,这个参数不建议修改

mapreduce.job.reduce.slowstart.completedmaps=0,05

文件合并(Merge)影子,如果文件数量太多,可以适当调大,减少IO次数

io.sort.factor=10

是否对Map的输出结果文件进行压缩,开启后CPU利用率会提高,但节省带宽

mapred.compress.map.output=false

启动 map/reduce 任务的推测执行机制,对拖后腿的任务一种优化机制。当一个作业的某些任务运行速度明显慢于同作业的其他任务时,Hadoop会在另一个节点上为慢任务启动一个备份任务,这样两个任务同时处理一份数据,而Hadoop最终会将优先完成的那个任务的结果做为最终结果,并将另一个任务杀掉。

mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true

注意

  1. 只会在被分配的 文件的block 上启动Map任务
  2. 一个目录有多少个文件,就会分多少个Map任务
  3. Map 读文件默认是一行一行读取,该条件可以设置。
  4. Map 会按照Key做排序,自定义排序可以实现 WritableComparable 接口
  5. Reducer 主要做聚合操作
  6. Reduce 会把数据分区(Hash),ReduceTask(1) 的数据不会 ReduceTask(2) 的结果中出现,输出的文件根据设置的工作数有关,可以使用 hadoop fs -getmerge /目录/所有文件 /目录/新得文件.txt 进行整合。
  7. ReduceTask 有分区的话会先分区再聚合。
  8. 分区可能会造成数据倾斜现象

相关文章

网友评论

      本文标题:Hadoop-3.1.3(六)MapReduce 机制

      本文链接:https://www.haomeiwen.com/subject/zjjgdltx.html