2018-03-25

作者: eddy88 | 来源:发表于2018-03-25 22:50 被阅读0次

    Hadoop v3.1 YARN和Kubernetes v1.9对GPU调度支持的比较


    1.0. namespaces

    简单来说,namespaces是linux内核于2008年提供的一套系统资源(用户、文件系统、网络等)隔离技术。
    例如,user namespaces可以实现container user于host user的映射。直观来看,container里的root其实并不是host里的root,它没有权限修改删除任何host里的系统root权限文件。
    Docker提供user namespaces功能,但是需要做以下配置才生效。

    1. 修改/etc/default/docker文件,添加DOCKER_OPTS=”--userns-remap=default”
    2. 重启docker服务
    3. docker会在host上创建dockremap用户

    Hadoop 自v2.7.0增加了DockerContainerExecutor(DCE),然而由于该实现不支持user namespaces,最终在v3.0.0作废。现在Hadoop采用的方案是LinuxContainerExecutor+DockerContainerRuntime。官方文档的解释如下。

    “Administrators should be aware that DCE doesn’t currently provide user name-space isolation. This means, in particular, that software running as root in the YARN container will have root privileges in the underlying NodeManager. ”

    参考
    Docker 使用 Linux namespace 隔离容器的运行环境
    Isolate containers with a user namespace
    Docker背后的内核知识——Namespace资源隔离

    1.1. cgroups

    简单来说,cgroups是linux内核于2006年提供的一种可以用来限制、管理和隔离进程所需系统物理资源(CPU、内存、GPU、网络等)的机制。Cent0S 7的默认挂载目录如下。


    每个目录称为subsystem,独立控制一种资源。
    cgroups具有树状层级结构。例如,Hadoop YARN配置cgroups后,会在相应subsystem下创建hierarchy, container_ID可以视作代表一组容器进程的task。

    /sys/fs/cgroup/devices/yarn/container_1521423483464_0004_01_000001/devices.denyCGroups

    cgroups对CPU的控制通过cpu,cpuacct subsystem来实现。作为一个简单的例子,如果想要限制一组进程的CPU使用率,可以做如下配置。

    1. 在cpu,cpuacct subsystem下面通过cgcreate创建一个新的子cgroup
    2. 在这个新的子cgroup里通过cgset cpu.cfs_quota_us=50000

    由于cpu,cpuacct subsystem的cpu.cfs_quota_us默认值是100000,当上述子 cgroup的进程启动时,则最多只能占用CPU 50%的时间。当然实际中,还需要考虑CPU的核数。

    cgroups对GPU的控制则是通过devices subsystem。
    参考
    Docker背后的内核知识——cgroups资源限制
    Behind Docker - Quick look into cgroups

    2. Hadoop v3.1 GPU

    一个简单的Hadoop集群示意图如下。


    2.1. ContainerExecutor初始化

    YARN需要配置containerExecutor类型是LinuxContainerExecutor。
    该类初始化过程中,会生成YARN当前支持所有资源的cgroups handlers—ResourceHandlerChain;还会根据runtimeType生成dockerContainerRuntime。
    启动容器时,会调用初始化生成的ResourceHandlerChain,以完成当前容器的各项资源的cgroups设置。当然,如果容器类型是docker,则不必额外设置。这里,就包括对GPU的devices cgroup设置。


    参考
    源码

    2.2. GPU发现与分配

    Hadoop 3.1添加了resource plugin的新模块,可以支持GPU和FPGA。
    YARN的每个nodeManager使用nvidia-smi -q -x发现GPU,并向resourceManager上报。
    启动nodeManager节点时,日志会显示如下信息。

    2018-03-16 11:17:21,867 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer: Trying to discover GPU information ...
    2018-03-16 11:17:22,173 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer: === Gpus in the system ===
    Driver Version:381.04
    ProductName=Tesla K40m, MinorNumber=0, TotalMemory=11439MiB, Utilization=0.0%
    ProductName=Tesla K40m, MinorNumber=1, TotalMemory=11439MiB, Utilization=0.0%
    ProductName=Tesla K40m, MinorNumber=2, TotalMemory=11439MiB, Utilization=0.0%
    ProductName=Tesla K40m, MinorNumber=3, TotalMemory=11439MiB, Utilization=0.0%
     
    2018-03-16 15:55:01,787 INFO org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM: Rolling master-key for container-tokens, got key with id 1262359944
    2018-03-16 15:55:01,788 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Registered with ResourceManager as hpc-6-044:57667 with total resource of <memory:8192, vCores:8, yarn.io/gpu: 4>
    

    YARN 3.1 的UI里面也添加了GPU信息如下。


    实际上,nvidia-docker自己也有REST API来发现GPU。

    [root@hpc-6-044 hadoop]# curl -s http://localhost:3476/v1.0/gpu/info/json
    {"Version":{"Driver":"381.04","CUDA":"8.0"},"Devices":[{"UUID":"GPU-04a37fdb-8792-0fa7-c681-0ff8fbf77d6d","Path":"/dev/nvidia0","Model":"Tesla K40m","Power":235,"CPUAffinity":0,"PCI":{"BusID":"0000:03:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:04:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-282e08ed-df8f-d6ad-40c1-7f0afa5931a1","Path":"/dev/nvidia1","Model":"Tesla K40m","Power":235,"CPUAffinity":0,"PCI":{"BusID":"0000:04:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:03:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-4eb77e26-f97b-5827-9eb8-5b00f5e51cb9","Path":"/dev/nvidia2","Model":"Tesla K40m","Power":235,"CPUAffinity":1,"PCI":{"BusID":"0000:82:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:83:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-d9aadc5d-68b3-1cd7-e0de-f45367117071","Path":"/dev/nvidia3","Model":"Tesla K40m","Power":235,"CPUAffinity":1,"PCI":{"BusID":"0000:83:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:82:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}}]}
    
    

    NodeManager通过GpuDiscoverer这个单例类实现对当前node上GPU的发现。initialize()方法会调用getGpuDeviceInformation(),其返还的GpuDeviceInformation是nvidia_smi_log XML的POJO,接着通过getGpusUsableByYarn()方法解析成预定义好的GpuDevice对象。

    分配GPU时,GpuResourceAllocator里的方法都声明成synchronized,且此时会使用包含容器ID的AssignedGpuDevice类(GpuDevice的子类)。分配GPU的流程图如下。


    参考
    源码

    2.3. 配置

    Hadoop 3.1暂未发布,需要下载源码,本地使用maven编译。有几点需要注:

    • Gcc需要升级到5.x
    • Protobuf的版本必须是2.5.0
    • 由于Hadoop要求container-executor和container-executor.cfg以及它们所在的父级目录的owner均为root:hadoop,所以在编译时可以强行指定container-executor.cfg的目录,从未避免直接修改$HADOOP_HOME/etc/hadoop/目录的owner所带来的不便。
    //container-executor.cfg单独放在/etc/hadoop目录,和hadoop目录隔离
    mvn package -Pdist,native -DskipTests -Dtar -Dcontainer-executor.conf.dir=/etc/hadoop
    

    Hadoop 3.1 Docker+GPU的配置主要包括nvidia-docker、GPU resource和cgroups的配置。实际测试中,遇到和cgroups相关的错误,暂未实现对GPU资源的调度。

    参考
    Using GPU On YARN
    Launching Applications Using Docker Containers
    Hadoop 3.1 YARN failed with error 'exitCode=255: CGroups: Could not find file to write' when launching a Docker container

    3. Kubernetes v1.9 GPU

    一个简单的Kubernetes集群示意图如下。


    K8S的GPU实现相较Hadoop YARN显得十分简洁。GPU类型定义、发现和分配的逻辑都写在kubernetes/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go
    其中,GPU数据结构是一个2维map, 外层key是podID,内层key是containerName,值是GPU在Docker容器内的/dev/nvidia目录。

    type containerToGPU map[string]sets.String
    
    // podGPUs represents a list of pod to GPU mappings.
    type podGPUs struct {
        podGPUMapping map[string]containerToGPU
    }
    
    func newPodGPUs() *podGPUs {
        return &podGPUs{
            podGPUMapping: make(map[string]containerToGPU),
        }
    }
    
    podGPUMapping[podUID][contName].Insert(device)
    

    之所以这样设计,是因为Kubernetes允许1个pod运行多个Docker容器。官方文档的解释如下。

    "The primary reason that Pods can have multiple containers is to support helper applications that assist a primary application. Typical examples of helper applications are data pullers, data pushers, and proxies. Helper and primary applications often need to communicate with each other. Typically this is done through a shared filesystem."

    类似Hadoop YARN里的GpuResourceAllocator,Kubernetes里的nvidiaGpuManager可以分配GPU, 并且通过sync.Mutex对所有写操作加锁限制。

    // nvidiaGPUManager manages nvidia gpu devices.
    type nvidiaGPUManager struct {
        sync.Mutex
        //当前节点上所有的GPU
        allGPUs        sets.String
        //2维map,外层key是podID,内层key是containerName,值是GPU在Docker容器内的/dev/nvidia目录
        allocated      *podGPUs
        defaultDevices []string
        dockerClient     dockertools.DockerInterface
        activePodsLister activePodsLister
    }
    

    分配GPU的流程图如下。


    虽然Hadoop YARN和Kubernetes存储GPU的数据结构和自身平台结构不同,但在GPU分配时二者流程相似,也都有考虑到GPU数据更新的线程安全问题。

    参考
    源码

    4. Hadoop v3.1 YARN调度

    YARN的调度分为FIFO、Capacity和Fair3种。FIFO适用单用户,而Capacity和Fair适用于多用户模式。下面主要来看Capacity调度。

    Capacity调度的基本思想是通过一个层级队列实现对集群资源的有效分配。


    对应的etc/hadoop/capacity-scheduler.xml配置如下。

    <?xml version="1.0"?>
    <configuration>
      <property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>prod,dev</value> </property>
    <property>
    <name>yarn.scheduler.capacity.root.dev.queues</name>
    <value>eng,science</value> </property>
    <property>
    <name>yarn.scheduler.capacity.root.prod.capacity</name>
    <value>40</value> </property> <property>
    <name>yarn.scheduler.capacity.root.dev.capacity</name>
    <value>60</value> </property> <property>
    <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
    <value>75</value> </property> <property>
    <name>yarn.scheduler.capacity.root.dev.eng.capacity</name>
    <value>50</value> </property> <property>
    <name>yarn.scheduler.capacity.root.dev.science.capacity</name>
    <value>50</value> </property>
    </configuration>
    

    例如,上面这个层级队列中dev队列分得集群60%的资源。dev队列下面有2个子队列eng和science将分得dev队列获得资源的各一半。当dev队列资源紧张时,Capacity调度可以从prod队列获取部分资源填补空缺(所谓弹性队列),而maximun-capacity 配置(75%)是允许其超越60%容量的上限,这样可以防止prod队列资源被全部侵占。用户通过YARN提交application的时候需要指定叶子队列的名字。因而,叶子队列的名字必须是惟一的。

    Capacity调度的入口方法如下。该方法首先将备选Nodes随机划分成2组,然后分别遍历2组中的每个Node。

    static void schedule(CapacityScheduler cs) throws InterruptedException{
        // First randomize the start point
        int current = 0;
        Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
        int start = random.nextInt(nodes.size());
    
        // Allocate containers of node [start, end)
        for (FiCaSchedulerNode node : nodes) {
          if (current++ >= start) {
            if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
              continue;
            }
            cs.allocateContainersToNode(node.getNodeID(), false);
          }
        }
    
        current = 0;
    
        // Allocate containers of node [0, start)
        for (FiCaSchedulerNode node : nodes) {
          if (current++ > start) {
            break;
          }
          if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging))      {
            continue;
          }
          cs.allocateContainersToNode(node.getNodeID(), false);
        }
      }
    

    allocateContainersToNode方法继续调用allocateContainerOnSingleNode方法如下。

    private CSAssignment allocateContainerOnSingleNode(
          CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
          boolean withNodeHeartbeat) {
          ... ...
           LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
          //YARN只允许提交application给叶子队列;这里让叶子队列进一步处理container的申请
          assignment = queue.assignContainers(getClusterResource(), candidates, new ResourceLimits(labelManager
                  .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
                      getClusterResource())),
              SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
          ... ...
    
    }
    

    交由提交application时指定的叶子队列来调用assignContainers方法如下。关键的2步分别是:交由application调用assignContainers方法;调用resourceCalculator比较container申请的资源能否得到满足。为了实现GPU资源调度,这里需要将resourceCalculator配置成DominantResourceCalculator。否则默认的DefaultResourceCalculator只会比较内存单一资源。

    public CSAssignment assignContainers(Resource clusterResource,
          CandidateNodeSet<FiCaSchedulerNode> candidates,
          ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
          //检查queue的max-capacity limit,检查user的limit等等
          ... ...
          //交由application的assignContainers方法;这里的application的类是FiCaSchedulerApp。
          assignment = application.assignContainers(clusterResource,
              candidates, currentResourceLimits, schedulingMode, null);
          Resource assigned = assignment.getResource();
          //调用配置的resourceCalculator比较assigned里的资源是否能得到满足;如果resourceCalculator是DominantResourceCalculator类,那么会比较每项资源,否则,默认只会比较内存单一资源
          if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
              Resources.none())) {
            return assignment;
          }
          ... ...
    }
    

    DominantResourceCalculator类的compare方法会比较所有支持的Resources,确保一个都不能少。

      /**
       * Compare two resources - if the value for every resource type for the lhs
       * is greater than that of the rhs, return 1. If the value for every resource
       * type in the lhs is less than the rhs, return -1. Otherwise, return 0
       */
    private int compare(Resource lhs, Resource rhs) {
        boolean lhsGreater = false;
        boolean rhsGreater = false;
        int ret = 0;
    
        int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
        for (int i = 0; i < maxLength; i++) {
          ResourceInformation lhsResourceInformation = lhs
              .getResourceInformation(i);
          ResourceInformation rhsResourceInformation = rhs
              .getResourceInformation(i);
          int diff = lhsResourceInformation.compareTo(rhsResourceInformation);
          if (diff >= 1) {
            lhsGreater = true;
          } else if (diff <= -1) {
            rhsGreater = true;
          }
        }
        if (lhsGreater && rhsGreater) {
          ret = 0;
        } else if (lhsGreater) {
          ret = 1;
        } else if (rhsGreater) {
          ret = -1;
        }
        return ret;
      }
    

    目前支持的Resources类型如下。

    public class ResourceInformation implements Comparable<ResourceInformation> {
      // Known resource types
      public static final String MEMORY_URI = "memory-mb";
      public static final String VCORES_URI = "vcores";
      public static final String GPU_URI = "yarn.io/gpu";
      public static final String FPGA_URI = "yarn.io/fpga";
      ... ...
    }
    

    参考
    YARN – THE CAPACITY SCHEDULER
    Hadoop: Capacity Scheduler
    源码

    5. Kubernetes v1.9调度

    Scheduler调度的入口方法如下。分为2个阶段,分别是Predicate和Prioritizing,对应findNodesThatFit()和PrioritizeNodes()方法。

    // Schedule tries to schedule the given pod to one of the nodes in the node list.
    // If it succeeds, it will return the name of the node.
    // If it fails, it will return a FitError error with reasons.
    func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
        ... ...
        //Predicate
        filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates)
        if err != nil {
            return "", err
        }
    
        if len(filteredNodes) == 0 {
            return "", &FitError{
                Pod:              pod,
                NumAllNodes:      len(nodes),
                FailedPredicates: failedPredicateMap,
            }
        }
    
        //如果Predicate只筛选出1个Node,那就不用继续做Prioritizing
        if len(filteredNodes) == 1 {
            metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
            return filteredNodes[0].Name, nil
        }
        
        //Prioritizing
        priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
        if err != nil {
            return "", err
        }
        return g.selectHost(priorityList)
    }
    

    5.1. Predicate

    第一阶段的Predicate将会从备选Node list中滤除不符合predicate策略的节点。每一个predicate策略都对应一个具体的方法,而且它们的顺序是有区分的。一般,越靠前代表越为重要。这样,如果第一个predicate策略不满足且alwaysCheckAllPredicates==false,就不必再往下检查其余predicate策略,直接宣告当前node不满足调度要求。另外,这些predicate策略也可以手工配置。目前支持的predicate策略如下。

    var (
        predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
            GeneralPred, HostNamePred, PodFitsHostPortsPred,
            MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
            PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
            CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
            MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
            CheckNodeMemoryPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
    )
    

    findNodesThatFit方法会调用workqueue.Parallelize启动多个(同时最多16个)gorutine,并行检查备选Node list里面的所有Nodes。针对每个Node的podFitsOnNode则会按顺序遍历predicate方法做检查。

    func findNodesThatFit(
        pod *v1.Pod,
        nodeNameToInfo map[string]*schedulercache.NodeInfo,
        nodes []*v1.Node,
        predicateFuncs map[string]algorithm.FitPredicate,
        extenders []algorithm.SchedulerExtender,
        metadataProducer algorithm.PredicateMetadataProducer,
        ecache *EquivalenceCache,
        schedulingQueue SchedulingQueue,
        alwaysCheckAllPredicates bool,
    ) ([]*v1.Node, FailedPredicateMap, error) {
        var filtered []*v1.Node
        failedPredicateMap := FailedPredicateMap{}
        ... ...
    
            checkNode := func(i int) {
                nodeName := nodes[i].Name
                fits, failedPredicates, err := podFitsOnNode(
                    pod,
                    meta,
                    nodeNameToInfo[nodeName],
                    predicateFuncs,
                    ecache,
                    schedulingQueue,
                    alwaysCheckAllPredicates,
                    equivCacheInfo,
                )
                if err != nil {
                    predicateResultLock.Lock()
                    errs[err.Error()]++
                    predicateResultLock.Unlock()
                    return
                }
                if fits {
                    filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
                } else {
                    predicateResultLock.Lock()
                    failedPredicateMap[nodeName] = failedPredicates
                    predicateResultLock.Unlock()
                }
            }
            
            workqueue.Parallelize(16, len(nodes), checkNode)
        ... ...
        return filtered, failedPredicateMap, nil
    }
    
    func podFitsOnNode(
        pod *v1.Pod,
        meta algorithm.PredicateMetadata,
        info *schedulercache.NodeInfo,
        predicateFuncs map[string]algorithm.FitPredicate,
        ecache *EquivalenceCache,
        queue SchedulingQueue,
        alwaysCheckAllPredicates bool,
        equivCacheInfo *equivalenceClassInfo,
    ) (bool, []algorithm.PredicateFailureReason, error) {
        var (
            eCacheAvailable  bool
            failedPredicates []algorithm.PredicateFailureReason
        )
        predicateResults := make(map[string]HostPredicate)
        ... ...
            //顺序遍历predicates策略
            for _, predicateKey := range predicates.Ordering() {
                var (
                    fit     bool
                    reasons []algorithm.PredicateFailureReason
                    err     error
                )
                if predicate, exist := predicateFuncs[predicateKey]; exist {
                    func() {
                        var invalid bool
                        if eCacheAvailable {
                            //优先从equivalence cache查找当前predicate策略匹配结果
                        }
    
                        if !eCacheAvailable || invalid {
                            //equivalence cache不靠谱,调用当前predicate策略方法
                            fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                            if err != nil {
                                return
                            }
                            //更新equivalence cache
                        }
                    }()
                    
                    if !fit {
                        if !alwaysCheckAllPredicates {
                            //如果当前predicate策略不满足且alwaysCheckAllPredicates==false,就此打住
                            break
                        }
        ... ...
        return len(failedPredicates) == 0, failedPredicates, nil
    }
    

    GPU资源的调度属于predicate策略中排序比较靠前的PodFitsResources策略。对应的检查方法中将GPU和CPU、内存等一并考虑。

    5.2. Prioritizing

    如果predicate筛选的Node list不止一个,Scheduler会继续进行优先级排序,最终选择优先级最高的Node。
    PrioritizeNodes方法也会调用workqueue.Parallelize启动多个(同时最多16个)gorutine并行检查通过predicate筛选的Node list里面的所有节点。

    // HostPriority represents the priority of scheduling to a particular host, higher priority is better.
    type HostPriority struct {
        // Name of the host
        Host string
        // Score associated with the host
        Score int
    }
    
    // HostPriorityList declares a []HostPriority type.
    type HostPriorityList []HostPriority
    
    func PrioritizeNodes(
        pod *v1.Pod,
        nodeNameToInfo map[string]*schedulercache.NodeInfo,
        meta interface{},
        priorityConfigs []algorithm.PriorityConfig,
        nodes []*v1.Node,
        extenders []algorithm.SchedulerExtender,
    ) (schedulerapi.HostPriorityList, error) {
        ... ...
        //results是一个2维slice
        results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
    
        ... ...
        processNode := func(index int) {
            nodeInfo := nodeNameToInfo[nodes[index].Name]
            var err error
            for i := range priorityConfigs {
                if priorityConfigs[i].Function != nil {
                    continue
                }
                results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
                if err != nil {
                    appendError(err)
                    return
                }
            }
        }
        workqueue.Parallelize(16, len(nodes), processNode)
        ... ...
        //result是一个1维slice,由results每列加权相加得到,每个元素是节点nodeName和对应priority分数
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    
        for i := range nodes {
            result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
            for j := range priorityConfigs {
                result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
            }
        }
        ... ...
        return result, nil
    }
    

    和predicate策略类似,Prioritizing条件也存在多种。不过,不同的是Prioritizing阶段会对每个节点的所有priority条件进行评分,分数存储在一个2维度slice results。最终会以节点为维度进行加权,这样,每个节点就有了一个priority分数,存储在1维slice result里面。

    最后,当存在多个节点都是最高分的时候,Scheduler为了避免调度不均衡,引入了一个简单算法如下。

    // selectHost takes a prioritized list of nodes and then picks one
    // in a round-robin manner from the nodes that had the highest score.
    func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
        if len(priorityList) == 0 {
            return "", fmt.Errorf("empty priorityList")
        }
    
        sort.Sort(sort.Reverse(priorityList))
        maxScore := priorityList[0].Score
        firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore })
    
        g.lastNodeIndexLock.Lock()
        ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore))
        g.lastNodeIndex++
        g.lastNodeIndexLock.Unlock()
    
        return priorityList[ix].Host, nil
    }
    

    例如,假定queue里面有2个待调度的container1和container2,且各自经过Predicate和Prioritizings后得到的result相同。

    [[‘Node1’,10],[‘Node2’,10],[‘Node3’,10],[‘Node4’,2],[‘Node5’,1],[‘Node6’,1],[‘Node7’,0]]
    

    这时,采用上述算法得到以下调度结果。

    对container1,lastNodeIndex=7, firstAfterMaxScore=3, 故选择7%3=1即[‘Node2’,10]; lastNodeIndex自增1变成8;

    对container2, lastNodeIndex=8, firstAfterMaxScore=3, 故选择8%3=2即[Node3’,10];lastNodeIndex自增1变成9;意味下次将调度[‘Node0’,10];

    Kubernetes和Hadoop YARN在调度上可谓各有千秋,就其异同点作如下简单小结。

    1. Hadoop YARN通过层级队列实现多用户的集群资源分配和管理;Kubernetes则通过namespace来达到类似效果,只不过namespace没有层级概念。
    2. 从源码中可以看出,Hadoop YARN在调度时需要遍历所有备选节点;Kubernetes则通过gorutines并行检查备选节点,当然每个节点还是需要遍历predicate条件直到全部通过或者失败退出。
    3. Hadoop YARN和Kubernetes在调度时都引入了随机性来确保节点调度均匀。Hadoop YARN是随机分割备选节点列表;Kubernetes则是通过round-robin从一组最优节点中挑选调度节点。
    4. Hadoop YARN的调度策略虽然也可以配置,但是Kubernetes的整体设计更加灵活可配置。。事实上,Kubernetes在早先版本中,scheduler是放在plugin目录下,其灵活性可见一斑。
    5. 单纯从代码可读性上来说,Kubernetes这点做的好太多了,不仅代码写的精巧,注释也非常充分。

    参考
    Kubernetes调度详解
    干货 | kube-scheduler原理解析
    Priority in Kubernetes API
    源码

    6. 后记

    6.1. Docker容器化

    TensorflowOnSpark在Github2200+的关注度,然而,不支持Docker的代价就是环境配置过于繁杂,每个executor节点的runtime从python版本到CUDA drive都要一致。固然通过spark-submit的各种option(例如-py-files或者-jars)可以实现运行环境的自动分发,但终归难以解决各种环境依赖问题等。从作者leewyang的回复来看,Docker容器化也是势在必行。


    6.2. Hadoop YARN service API

    不知道是不是看到K8S如此成功,YARN终于也要开放提供REST接口,而且可以像提交K8S YAML/JSON一样直接指定Docker镜像和所需CPU/Memory/GPU资源。不过默认Simple Auth情况下的user是dr.who,任务实际上会被拒绝。(可能通过配置可以解决。)


    下面这个例子可以用HTTP Post请求替代原始YARN提交命令。

    yarn jar /home/junzhang22/hadoop-3.1.0-SNAPSHOT/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.1.0-SNAPSHOT.jar \
    
    -jar /home/junzhang22/hadoop-3.1.0-SNAPSHOT/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.1.0-SNAPSHOT.jar \
    
    -shell_env YARN_CONTAINER_RUNTIME_TYPE=docker \
    
    -shell_env YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=tensorflow-gpu-boto-test \
    
    -shell_script ./run.sh \
    
    -container_resources memory-mb=3072,vcores=1,yarn.io/gpu=1 \
    
    -num_containers 1
    
    POST /app/v1/services/ HTTP/1.1
    
    Host: 10.1.86.15:8088
    
    Content-Type: application/json
    

    Request:

    {
    
     "name": "hello-world4",
    
     "version": "1.0.0",
    
     "description": "hello world example",
    
     "components" :
    
     [
    
     {
    
     "name": "hello",
    
     "number_of_containers": 1,
    
     "artifact": {
    
     "id": "tensorflow-gpu-boto-test",
    
     "type": "DOCKER"
    
     },
    
     "launch_command": "./run.sh",
    
     "resource": {
    
     "cpus": 1,
    
     "memory": "1024",
    
     "additional" : {
    
     "yarn.io/gpu" : {
    
     "value" : 1,
    
     "unit" : ""
    
     }
    
     }
    
     }
    
     }
    
     ]
    
    }
    

    Response:

    {
    
     "uri": "/v1/services/hello-world4",
    
     "diagnostics": "Application ID: application_1521616970183_0001",
    
     "state": "ACCEPTED"
    
    }
    

    参考
    YarnServiceAPI

    相关文章

      网友评论

        本文标题:2018-03-25

        本文链接:https://www.haomeiwen.com/subject/lcakcftx.html