美文网首页
RAS对Memory、CPU的操作

RAS对Memory、CPU的操作

作者: 泽林呗 | 来源:发表于2017-12-30 09:19 被阅读0次

    RAS中对Memory、CPU操作的方法

    private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
            if (schedulerAssignmentMap == null) {
                LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
                return false;
            } else {
                double requestedMemOnHeap = td.getTotalRequestedMemOnHeap().doubleValue();
                double requestedMemOffHeap = td.getTotalRequestedMemOffHeap().doubleValue();
                double requestedCpu = td.getTotalRequestedCpu().doubleValue();
                double assignedMemOnHeap = 0.0D;
                double assignedMemOffHeap = 0.0D;
                double assignedCpu = 0.0D;
                Map<WorkerSlot, Double[]> workerResources = new HashMap();
                Set<String> nodesUsed = new HashSet();
                Iterator var17 = schedulerAssignmentMap.entrySet().iterator();
    
                while(var17.hasNext()) {
                    Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry = (Entry)var17.next();
                    WorkerSlot targetSlot = (WorkerSlot)workerToTasksEntry.getKey();
                    Collection<ExecutorDetails> execsNeedScheduling = (Collection)workerToTasksEntry.getValue();
                    RAS_Node targetNode = this.schedulingState.nodes.getNodeById(targetSlot.getNodeId());
                    targetSlot = this.allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
                    targetNode.assign(targetSlot, td, execsNeedScheduling);
                    LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}", new Object[]{td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort()});
                    Iterator var22 = execsNeedScheduling.iterator();
    
                    while(var22.hasNext()) {
                        ExecutorDetails exec = (ExecutorDetails)var22.next();
                        targetNode.consumeResourcesforTask(exec, td);
                    }
    
                    if (!nodesUsed.contains(targetNode.getId())) {
                        nodesUsed.add(targetNode.getId());
                    }
    
                    assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
                    assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
                    assignedCpu += targetSlot.getAllocatedCpu();
                    Double[] worker_resources = new Double[]{requestedMemOnHeap, requestedMemOffHeap, requestedCpu, targetSlot.getAllocatedMemOnHeap(), targetSlot.getAllocatedMemOffHeap(), targetSlot.getAllocatedCpu()};
                    workerResources.put(targetSlot, worker_resources);
                }
    
                Double[] resources = new Double[]{requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
                LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} assigned on-heap mem, off-heap mem, cpu: {} {} {}", new Object[]{td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu});
                this.schedulingState.cluster.setTopologyResources(td.getId(), resources);
                this.schedulingState.cluster.setWorkerResources(td.getId(), workerResources);
                return true;
            }
        }
    

    以下是解析

    通过td.getTotalRequestdMemOnHeap()、td.getTotalRequestedMemOffHeap()、td.getTotalRequestedCpu()这三个方法获取每个Topology中设置的Memory和CPU

    然后设置三个属性assignedMemOnHeap 、assignedMemOffHeap、assignedCpu来定义已使用的Mem、CPU

    Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry = (Entry)var17.next();
                    WorkerSlot targetSlot = (WorkerSlot)workerToTasksEntry.getKey();
                    Collection<ExecutorDetails> execsNeedScheduling = (Collection)workerToTasksEntry.getValue();
    
    通过上面的方法获取目标的Mem、CPU和已经需要scheduler的executor
    Iterator var22 = execsNeedScheduling.iterator();
    
                    while(var22.hasNext()) {
                        ExecutorDetails exec = (ExecutorDetails)var22.next();
                        targetNode.consumeResourcesforTask(exec, td);
                    }
    
    通过上面的方法消费目标节点的Mem和CPU
    可以看到consumeResourceforTask是RAS_Node类里的方法
    public void consumeResourcesforTask(ExecutorDetails exec, TopologyDetails topo) {
            Double taskMemReq = topo.getTotalMemReqTask(exec);
            Double taskCpuReq = topo.getTotalCpuReqTask(exec);
            this.consumeCPU(taskCpuReq);
            this.consumeMemory(taskMemReq);
        }
    
    public Double consumeCPU(Double amount) {
            if (amount.doubleValue() > this._availCPU.doubleValue()) {
                LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, this._availCPU);
                throw new IllegalStateException("Attempting to consume more CPU than available");
            } else {
                this._availCPU = this._availCPU.doubleValue() - amount.doubleValue();
                return this._availCPU;
            }
        }
    
    public Double consumeMemory(Double amount) {
            if (amount.doubleValue() > this._availMemory.doubleValue()) {
                LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, this._availMemory);
                throw new IllegalStateException("Attempting to consume more memory than available");
            } else {
                this._availMemory = this._availMemory.doubleValue() - amount.doubleValue();
                return this._availMemory;
            }
        }
    
    最后,再通过一个Double[]数组存储resource
    Double[] resources = new Double[]{requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
    
    并保存起来
     this.schedulingState.cluster.setTopologyResources(td.getId(), resources);
                this.schedulingState.cluster.setWorkerResources(td.getId(), workerResources);
    

    相关文章

      网友评论

          本文标题:RAS对Memory、CPU的操作

          本文链接:https://www.haomeiwen.com/subject/zorggxtx.html