MapReduce(四):Uber模式

作者: b91cbec6a902 | 来源:发表于2019-02-28 16:06 被阅读1次

概述

基于Hadoop 2.x

当作业输入的数据量非常小,每个Map和Reduce任务处理的数据量也非常小,那么为Map和Reduce任务申请Container并启动Container花的时间比数据处理的时间还长。
MapReduce计算引擎提供了Uber模式来解决这个问题:当作业足够小时,所有Map和Reduce任务在同一个JVM中进行。

Uber作业优化默认关闭,可以在mapred-site.xml配置打开:

<property>
    <name>mapreduce.job.ubertask.enable</name>
    <value>false</value>
</property>

如何判断一个作业是Uber作业

MRAppMaster在初始化Job时会判断该Job是否是一个Uber作业。

# org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl#makeUberDecision

// 最大Map数, 判定是否为uber作业的条件之一
int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
// 最大Reduce数,判定是否为uber作业的条件之一
int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
// 最大数据输入总量(一个文件块大小),判定是否为uber作业的条件之一
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
    fs.getDefaultBlockSize(this.remoteJobSubmitDir));
// MRAppMaster的内存,判定是否为uber作业的条件之一
long sysMemSizeForUberSlot =
    conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
        MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
// MRAppMaster的CPU核数, 判定是否为uber作业的条件之一
long sysCPUSizeForUberSlot =
        conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
            MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);

// 配置文件中是否开启uber作业优化
boolean uberEnabled =
        conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
// 是否满足Map任务数小于等于最大Map数
boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
// 是否满足Reduce任务数小于等于最大Reduce数
boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
// 任务所需的内存是否小于等于MRAppMaster的内存
boolean smallMemory =
        (requiredMB <= sysMemSizeForUberSlot)
        || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
// 任务所需的CPU核数是否小于等于MRAppMaster的CPU核数
boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
// 是否为链式作业
boolean notChainJob = !isChainJob(conf);

// 综合以上7个条件判断
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
    && smallInput && smallMemory && smallCpu 
    && notChainJob;

判定一个作业是否为uber作业的条件还是蛮苛刻的,需要同时满足一下条件:
①mapreduce.job.ubertask.enable = true
②Map任务数小于等于9
③Reduce任务数小于等于1
④输入的总数据量小于等于一个文件块大小(128M)
⑤map任务和reduce任务需要的资源量不能大于MRAppMaster的资源量
⑥非链式作业

对Uber作业做了哪些优化

1、禁用推测执行机制。

conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);

2、等到所有的Map任务全部结束以后才会启动Reduce任务。

conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
                        1.0f);

3、Map和Reduce任务只有一个TaskAttempt。

conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);

4、资源分配器为本地模式,申请的Container跟MRAppMaster所在的Container是同一个。

if (job.isUber()) {
    MRApps.setupDistributedCacheLocal(getConfig());
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
}

5、容器启动器为本地模式,Map任务和Reduce任务都是运行在MRAppMaster所在的JVM内。

if (job.isUber()) {
    this.containerLauncher = new LocalContainerLauncher(context,
        (TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
    ((LocalContainerLauncher) this.containerLauncher)
            .setEncryptedSpillKey(encryptedSpillKey);
}

总结

Uber模式让小作业的Map和Reduce任务运行在MRAppMaster的JVM之中,避免了Container的申请与分配,让作业由多进程变成了多线程,无疑提高了小作业的效率。

相关文章

网友评论

    本文标题:MapReduce(四):Uber模式

    本文链接:https://www.haomeiwen.com/subject/qkouyqtx.html