美文网首页
CDH版本的YARN默认调度Fair scheduler 和DR

CDH版本的YARN默认调度Fair scheduler 和DR

作者: 润土1030 | 来源:发表于2018-08-10 16:51 被阅读550次

    工作中这个问题碰到好几次了,好多同事对这地方也有点模糊,啥是调度规则(scheduling rule),啥是调度策略(scheduling policy),加上CDH本身有个动态资源池的概念。所以把这个东西简单说一下。

    什么是资源

    对于一个资源管理系统,首先要确定什么是资源,然后将每种资源量化,最后对量化的资源进行管理。

    YARN对资源的抽象很简单,只有内存和vcore,这两种资源。每个NodeManager节点贡献一定的内存和vcore,由ResourceManager统一管理。

    YARN的三种调度规则(scheduling rule)

    • FIFO
    • CapacityScheduler
    • FairScheduler

    这几种调度规则具体是干啥的,不详细介绍了,那不是本文的重点,想了解的可以去官网查看。

    FairScheduler是干啥的

    Fair scheduling is a method of assigning resources to applications such that all apps get, on average, an equal share of resources over time. Hadoop NextGen is capable of scheduling multiple resource types. By default, the Fair Scheduler bases scheduling fairness decisions only on memory. It can be configured to schedule with both memory and CPU, using the notion of Dominant Resource Fairness developed by Ghodsi et al. When there is a single app running, that app uses the entire cluster. When other apps are submitted, resources that free up are assigned to the new apps, so that each app eventually on gets roughly the same amount of resources. Unlike the default Hadoop scheduler, which forms a queue of apps, this lets short apps finish in reasonable time while not starving long-lived apps. It is also a reasonable way to share a cluster between a number of users. Finally, fair sharing can also work with app priorities - the priorities are used as weights to determine the fraction of total resources that each app should get.

    上面是Apache官网的介绍,主要就是几点:

    1. Fair-公平: 这个调度规则主要就是保证公平,每个应用有一定的资源使用。
    2. Fair Scheduler默认的调度策略(scheduling policy)是基于内存的,但是可以选择基于内存和vcore的调度策略,即DRF(Dominant Resource Fairness)

    启用Fair Scheduler需要在yarn-site.xml设置

    <property>
      <name>yarn.resourcemanager.scheduler.class</name>
      <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
    </property>
    

    Fair Scheduler支持的调度策略

    Additionally, the fair scheduler allows setting a different custom policy for each queue to allow sharing the queue’s resources in any which way the user wants. A custom policy can be built by extending org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy. FifoPolicy, FairSharePolicy (default), and DominantResourceFairnessPolicy are built-in and can be readily used.

    对于CDH版本来说有些不同,CDH版本的Fair Scheduler默认是采用DRF 策略

    Dominant Resource Fairness (DRF) (default) - An extension of fair scheduling for more than one resource. DRF determines CPU and memory resource shares based on the availability of those resources and the job requirements.

    也就是说CDH版本的YARN默认采用的调度策略是Fair Scheduler的DRF策略,即基于vcore和内存的策略,而不是只基于内存的调度策略。

    我们看下FairScheduler的代码

     public FairScheduler() {
        super(FairScheduler.class.getName());
        clock = new SystemClock();
        context = new FSContext(this);
        allocsLoader = new AllocationFileLoaderService();
        queueMgr = new QueueManager(this);
        maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
      }
    

    可以看到在FairScheduler的构造函数中有个 allocsLoader = new AllocationFileLoaderService(); 这段代码初始化了一个类叫AllocationFileLoaderService

    AllocationFileLoaderService 这个类是用来加载fair-scheduler.xml的内容的,即所谓的Allocation file

    其中解析调度策略(schedulingPolicy)的代码逻辑如下

    schedulingPolicy: to set the scheduling policy of any queue. The allowed values are “fifo”/“fair”/“drf” or any class that extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy. Defaults to “fair”. If “fifo”, apps with earlier submit times are given preference for containers, but apps submitted later may run concurrently if there is leftover space on the cluster after satisfying the earlier app’s requests.

     String text = ((Text)field.getFirstChild()).getData().trim();
     SchedulingPolicy policy = SchedulingPolicy.parse(text);
     queuePolicies.put(queueName, policy);
    

    SchedulingPolicy.parse这个方法代码如下

    public static SchedulingPolicy parse(String policy)
          throws AllocationConfigurationException {
        @SuppressWarnings("rawtypes")
        Class clazz;
        String text = policy.toLowerCase();
        if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
          clazz = FairSharePolicy.class;
        } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
          clazz = FifoPolicy.class;
        } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
          clazz = DominantResourceFairnessPolicy.class;
        } else {
          try {
            clazz = Class.forName(policy);
          } catch (ClassNotFoundException cnfe) {
            throw new AllocationConfigurationException(policy
                + " SchedulingPolicy class not found!");
          }
        }
        if (!SchedulingPolicy.class.isAssignableFrom(clazz)) {
          throw new AllocationConfigurationException(policy
              + " does not extend SchedulingPolicy");
        }
        return getInstance(clazz);
      }
    

    fair-scheduler.xml的配置内容如下,可以看到schedulingPolicy是drf

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <allocations>
        <queue name="root">
            <schedulingPolicy>drf</schedulingPolicy>
            <queue name="default">
                <schedulingPolicy>drf</schedulingPolicy>
            </queue>
            <queue name="users" type="parent">
                <schedulingPolicy>drf</schedulingPolicy>
            </queue>
        </queue>
        <queuePlacementPolicy>
            <rule name="specified" create="true"/>
            <rule name="nestedUserQueue">
                <rule name="default" queue="users"/>
            </rule>
            <rule name="default" create="true"/>
        </queuePlacementPolicy>
    </allocations>
    

    至此整个流程已经介绍完毕。我们再来看下drf(DominantResourceFairnessPolicy)算法内部的核心逻辑,cpu和memory

     /**
         * Calculates and orders a resource's share of a pool in terms of two vectors.
         * The shares vector contains, for each resource, the fraction of the pool that
         * it takes up.  The resourceOrder vector contains an ordering of resources
         * by largest share.  So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>,
         * shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY].
         */
        void calculateShares(Resource resource, Resource pool,
            ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
          shares.setWeight(MEMORY, (float)resource.getMemory() /
              (pool.getMemory() * weights.getWeight(MEMORY)));
          shares.setWeight(CPU, (float)resource.getVirtualCores() /
              (pool.getVirtualCores() * weights.getWeight(CPU)));
          // sort order vector by resource share
          if (resourceOrder != null) {
            if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
              resourceOrder[0] = MEMORY;
              resourceOrder[1] = CPU;
            } else  {
              resourceOrder[0] = CPU;
              resourceOrder[1] = MEMORY;
            }
          }
        }
    



    上面没有将CDH中的一个概念叫做Dynamic Resource Pools,可以点击了解。对于yarn来说,其实就是调度规则(scheduling rule) 是Fair scheduler,默认调度策略(Scheduling Policy)是基于cpu(vcore)和内存(memory)的DRF。

    相关文章

      网友评论

          本文标题:CDH版本的YARN默认调度Fair scheduler 和DR

          本文链接:https://www.haomeiwen.com/subject/qnokbftx.html