概述
基于Hadoop 2.x
当作业输入的数据量非常小,每个Map和Reduce任务处理的数据量也非常小,那么为Map和Reduce任务申请Container并启动Container花的时间比数据处理的时间还长。
MapReduce计算引擎提供了Uber模式来解决这个问题:当作业足够小时,所有Map和Reduce任务在同一个JVM中进行。
Uber作业优化默认关闭,可以在mapred-site.xml配置打开:
<property>
<name>mapreduce.job.ubertask.enable</name>
<value>false</value>
</property>
如何判断一个作业是Uber作业
MRAppMaster在初始化Job时会判断该Job是否是一个Uber作业。
# org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl#makeUberDecision
// 最大Map数, 判定是否为uber作业的条件之一
int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
// 最大Reduce数,判定是否为uber作业的条件之一
int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
// 最大数据输入总量(一个文件块大小),判定是否为uber作业的条件之一
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
fs.getDefaultBlockSize(this.remoteJobSubmitDir));
// MRAppMaster的内存,判定是否为uber作业的条件之一
long sysMemSizeForUberSlot =
conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
// MRAppMaster的CPU核数, 判定是否为uber作业的条件之一
long sysCPUSizeForUberSlot =
conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
// 配置文件中是否开启uber作业优化
boolean uberEnabled =
conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
// 是否满足Map任务数小于等于最大Map数
boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
// 是否满足Reduce任务数小于等于最大Reduce数
boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
// 任务所需的内存是否小于等于MRAppMaster的内存
boolean smallMemory =
(requiredMB <= sysMemSizeForUberSlot)
|| (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
// 任务所需的CPU核数是否小于等于MRAppMaster的CPU核数
boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
// 是否为链式作业
boolean notChainJob = !isChainJob(conf);
// 综合以上7个条件判断
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && smallCpu
&& notChainJob;
判定一个作业是否为uber作业的条件还是蛮苛刻的,需要同时满足一下条件:
①mapreduce.job.ubertask.enable = true
②Map任务数小于等于9
③Reduce任务数小于等于1
④输入的总数据量小于等于一个文件块大小(128M)
⑤map任务和reduce任务需要的资源量不能大于MRAppMaster的资源量
⑥非链式作业
对Uber作业做了哪些优化
1、禁用推测执行机制。
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
2、等到所有的Map任务全部结束以后才会启动Reduce任务。
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1.0f);
3、Map和Reduce任务只有一个TaskAttempt。
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
4、资源分配器为本地模式,申请的Container跟MRAppMaster所在的Container是同一个。
if (job.isUber()) {
MRApps.setupDistributedCacheLocal(getConfig());
this.containerAllocator = new LocalContainerAllocator(
this.clientService, this.context, nmHost, nmPort, nmHttpPort
, containerID);
}
5、容器启动器为本地模式,Map任务和Reduce任务都是运行在MRAppMaster所在的JVM内。
if (job.isUber()) {
this.containerLauncher = new LocalContainerLauncher(context,
(TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
((LocalContainerLauncher) this.containerLauncher)
.setEncryptedSpillKey(encryptedSpillKey);
}
总结
Uber模式让小作业的Map和Reduce任务运行在MRAppMaster的JVM之中,避免了Container的申请与分配,让作业由多进程变成了多线程,无疑提高了小作业的效率。
网友评论