MapReduce(四):Uber模式

作者: b91cbec6a902 | 来源:发表于2019-02-28 16:06 被阅读1次

    概述

    基于Hadoop 2.x

    当作业输入的数据量非常小,每个Map和Reduce任务处理的数据量也非常小,那么为Map和Reduce任务申请Container并启动Container花的时间比数据处理的时间还长。
    MapReduce计算引擎提供了Uber模式来解决这个问题:当作业足够小时,所有Map和Reduce任务在同一个JVM中进行。

    Uber作业优化默认关闭,可以在mapred-site.xml配置打开:

    <property>
        <name>mapreduce.job.ubertask.enable</name>
        <value>false</value>
    </property>
    

    如何判断一个作业是Uber作业

    MRAppMaster在初始化Job时会判断该Job是否是一个Uber作业。

    # org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl#makeUberDecision
    
    // 最大Map数, 判定是否为uber作业的条件之一
    int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
    // 最大Reduce数,判定是否为uber作业的条件之一
    int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
    // 最大数据输入总量(一个文件块大小),判定是否为uber作业的条件之一
    long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
        fs.getDefaultBlockSize(this.remoteJobSubmitDir));
    // MRAppMaster的内存,判定是否为uber作业的条件之一
    long sysMemSizeForUberSlot =
        conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
            MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
    // MRAppMaster的CPU核数, 判定是否为uber作业的条件之一
    long sysCPUSizeForUberSlot =
            conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
                MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
    
    // 配置文件中是否开启uber作业优化
    boolean uberEnabled =
            conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
    // 是否满足Map任务数小于等于最大Map数
    boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
    // 是否满足Reduce任务数小于等于最大Reduce数
    boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
    // 任务所需的内存是否小于等于MRAppMaster的内存
    boolean smallMemory =
            (requiredMB <= sysMemSizeForUberSlot)
            || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
    // 任务所需的CPU核数是否小于等于MRAppMaster的CPU核数
    boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
    // 是否为链式作业
    boolean notChainJob = !isChainJob(conf);
    
    // 综合以上7个条件判断
    isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
        && smallInput && smallMemory && smallCpu 
        && notChainJob;
    

    判定一个作业是否为uber作业的条件还是蛮苛刻的,需要同时满足一下条件:
    ①mapreduce.job.ubertask.enable = true
    ②Map任务数小于等于9
    ③Reduce任务数小于等于1
    ④输入的总数据量小于等于一个文件块大小(128M)
    ⑤map任务和reduce任务需要的资源量不能大于MRAppMaster的资源量
    ⑥非链式作业

    对Uber作业做了哪些优化

    1、禁用推测执行机制。

    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
    

    2、等到所有的Map任务全部结束以后才会启动Reduce任务。

    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
                            1.0f);
    

    3、Map和Reduce任务只有一个TaskAttempt。

    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
    

    4、资源分配器为本地模式,申请的Container跟MRAppMaster所在的Container是同一个。

    if (job.isUber()) {
        MRApps.setupDistributedCacheLocal(getConfig());
        this.containerAllocator = new LocalContainerAllocator(
            this.clientService, this.context, nmHost, nmPort, nmHttpPort
            , containerID);
    }
    

    5、容器启动器为本地模式,Map任务和Reduce任务都是运行在MRAppMaster所在的JVM内。

    if (job.isUber()) {
        this.containerLauncher = new LocalContainerLauncher(context,
            (TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
        ((LocalContainerLauncher) this.containerLauncher)
                .setEncryptedSpillKey(encryptedSpillKey);
    }
    

    总结

    Uber模式让小作业的Map和Reduce任务运行在MRAppMaster的JVM之中,避免了Container的申请与分配,让作业由多进程变成了多线程,无疑提高了小作业的效率。

    相关文章

      网友评论

        本文标题:MapReduce(四):Uber模式

        本文链接:https://www.haomeiwen.com/subject/qkouyqtx.html