美文网首页
2020-02-20

2020-02-20

作者: 小小少年Boy | 来源:发表于2020-02-20 14:25 被阅读0次

    RM的子服务列表

    [TOC]

    RMApplicationHistoryWriter
    AsyncDispatcher
    AdminService
    RMActiveServices
    –RMSecretManagerService
    –ContainerAllocationExpirer
    –AMLivelinessMonitor
    –RMNodeLabelsManager
    –RMStateStore
    –RMApplicationHistoryWriter
    –SystemMetricsPublisher
    –NodesListManager
    –ResourceScheduler
    –SchedulerEventDispatcher
    –NMLivelinessMonitor
    –ResourceTrackerService
    –ApplicationMasterService
    –ClientRMService
    –ApplicationMasterLauncher
    –DelegationTokenRenewer
    ————————————————
    版权声明:本文为CSDN博主「Jerry Shao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/weixin_44630798/article/details/89005651

    1 ClientRMService初始化

    在ResourceManager#RMActiveServices#serviceInit()方法中进行ClientRMService的创建

    ResourceManager.RMActiveServices.serviceInit(){
        clientRM = createClientRMService();
        addService(clientRM);
        rmContext.setClientRMService(clientRM);
    }
    

    再看ClientRMService的serviceInit()方法,很简单就是为设置了下客户端绑定地址(即RM的RPC ip+port)

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        clientBindAddress = getBindAddress(conf);
        super.serviceInit(conf);
    }
    InetSocketAddress getBindAddress(Configuration conf) {
        return conf.getSocketAddr(
            YarnConfiguration.RM_BIND_HOST,
            YarnConfiguration.RM_ADDRESS,
            YarnConfiguration.DEFAULT_RM_ADDRESS,
            YarnConfiguration.DEFAULT_RM_PORT);
    }
    

    2 ResourceScheduler

    在ResouceManager#RMActiveServices#serviceInit()方法中进行ResourceScheduler的创建;

    进入createScheduler():根据配置文件中的参数反射创建scheduler实例,默认创建出CapacityScheduler对象。

    ResourceManager.RMActiveServices.serviceInit(){
        // Initialize the scheduler
        scheduler = createScheduler();
        scheduler.setRMContext(rmContext);
        addIfService(scheduler);
        rmContext.setScheduler(scheduler);
    }
    
    protected ResourceScheduler createScheduler() {
        String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
                                             YarnConfiguration.DEFAULT_RM_SCHEDULER);
        LOG.info("Using Scheduler: " + schedulerClassName);
        try {
            Class<?> schedulerClazz = Class.forName(schedulerClassName);
            if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
                return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
                                                                       this.conf);
            } else {
                throw new YarnRuntimeException("Class: " + schedulerClassName
                                               + " not instance of " + ResourceScheduler.class.getCanonicalName());
            }
        } catch (ClassNotFoundException e) {
            throw new YarnRuntimeException("Could not instantiate Scheduler: "
                                           + schedulerClassName, e);
        }
    }
    

    由于scheduler被addIfService(scheduler)了,所以进入CapacityScheduler的serviceInit()

    3 CapacityScheduler.java

    [图片上传失败...(image-aeae3a-1582179940006)]

    //存储队列信息
    private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
    

    进入CapacityScheduler的serviceInit(),先看super.serviceInit(conf),再看initScheduler(configuration);

    public void serviceInit(Configuration conf) throws Exception {
        Configuration configuration = new Configuration(conf);
        super.serviceInit(conf);
        initScheduler(configuration);
    }
    
    //AbstractYarnScheduler.java
    @Override
     public void serviceInit(Configuration conf) throws Exception {
       //yarn.nm.liveness-monitor.expiry-interval-ms nm存活检测间隔,默认600s
       nmExpireInterval =
           conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
       //yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms 默认10s
       configuredMaximumAllocationWaitTime =
           conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
             YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
       //创建定期释放死亡NM上container的Timer,调度间隔为nmExpireInterval
       createReleaseCache();
       super.serviceInit(conf);
     }
    
    private synchronized void initScheduler(Configuration configuration) throws
        IOException {
        this.conf = loadCapacitySchedulerConfiguration(configuration);
        //memory:minMem maxMem ; vcore:minVcores maxVcores
        validateConf(this.conf);
        //minimumMemory, minimumCores记录在Resource中返回
        this.minimumAllocation = this.conf.getMinimumAllocation();
        //初始化最大的资源给configuredMaxAllocation对象:会在ClusterNodeTracker.setConfiguredMaxAllocation(Resource)中使用
        //ReentrantReadWriteLock锁,然后clone resource对象,复制给configuredMaxAllocation对象
        initMaximumResourceCapability(this.conf.getMaximumAllocation());
        //创建出资源计算器对象DefaultResourceCalculator
        this.calculator = this.conf.getResourceCalculator();
        //yarn.scheduler.include-port-in-node-name 默认false
        this.usePortForNodeName = this.conf.getUsePortForNodeName();
        //调度的app列表
        this.applications = new ConcurrentHashMap<ApplicationId,SchedulerApplication<FiCaSchedulerApp>>();
        //获取节点标签管理器对象
        this.labelManager = rmContext.getNodeLabelManager();
        authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
        //初始化队列
        initializeQueues(this.conf);
        //是否开启异步资源调度 yarn.scheduler.capacity.schedule-asynchronously.enable,默认false
        scheduleAsynchronously = this.conf.getScheduleAynschronously();
        //异步调度间隔 yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms,默认5s
        asyncScheduleInterval =
            this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,DEFAULT_ASYNC_SCHEDULER_INTERVAL);
        //如果开启了异步调度,则创建异步调度线程
        if (scheduleAsynchronously) {
            asyncSchedulerThread = new AsyncScheduleThread(this);
        }
    
        LOG.info("Initialized CapacityScheduler with " +
                 "calculator=" + getResourceCalculator().getClass() + ", " +
                 "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
                 "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
                 "asynchronousScheduling=" + scheduleAsynchronously + ", " +
                 "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
    }
    

    重点来分析下初始化队列initializeQueues(this.conf);

    (1)解析配置项,构建队列树
    (2)构建队列和用户/组的映射

    @Lock(CapacityScheduler.class)
    private void initializeQueues(CapacitySchedulerConfiguration conf)
        throws IOException {
        //构建队列树,树根为root队列   
        root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,queues, queues, noop);
        labelManager.reinitializeQueueLabels(getQueueToLabels());
        LOG.info("Initialized root queue " + root);
        initializeQueueMappings();
        setQueueAcls(authorizer, queues);
    }
    

    (1) 解析配置项,构建队列树

    进入parseQueue,解析队列

    @Lock(CapacityScheduler.class)
    static CSQueue parseQueue(
        CapacitySchedulerContext csContext,
        CapacitySchedulerConfiguration conf,
        CSQueue parent, String queueName, Map<String, CSQueue> queues,
        Map<String, CSQueue> oldQueues,
        QueueHook hook) throws IOException {
        CSQueue queue;
        //fullQueueName=root
        String fullQueueName = (parent == null) ? queueName
            : (parent.getQueuePath() + "." + queueName);
        //获取子队列列表,即读取配置项yarn.scheduler.capacity.root.queues
        String[] childQueueNames = conf.getQueues(fullQueueName);
        //队列否开启了资源预留,即读取配置项yarn.scheduler.capacity.root.reservable,默认为false
        boolean isReservableQueue = conf.isReservable(fullQueueName);
        //如果没有子队列则创建LeafQueue
        if (childQueueNames == null || childQueueNames.length == 0) {
            if (null == parent) {
                throw new IllegalStateException(
                    "Queue configuration missing child queue names for " + queueName);
            }
            // Check if the queue will be dynamically managed by the Reservation
            // system
             // 如果队列开启了资源预留,则创建PlanQueue,该类型的queue会被ReservationSystem管理
            if (isReservableQueue) {
                queue =
                    new PlanQueue(csContext, queueName, parent,
                                  oldQueues.get(queueName));
            } else {
                queue = new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
    
                // Used only for unit tests
                queue = hook.hook(queue);
            }
        } else {//有子队列则创建ParentQueue
            if (isReservableQueue) {
                throw new IllegalStateException(
                    "Only Leaf Queues can be reservable for " + queueName);
            }
            //root队列构建
            ParentQueue parentQueue =
                new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
    
            // Used only for unit tests
            queue = hook.hook(parentQueue);
            //root的子队列构建,递归调用parseQueue
            List<CSQueue> childQueues = new ArrayList<CSQueue>();
            for (String childQueueName : childQueueNames) {
                CSQueue childQueue =
                    parseQueue(csContext, conf, queue, childQueueName,
                               queues, oldQueues, hook);
                childQueues.add(childQueue);
            }
            //为父队列设置子队列,即childQueues属性赋值,childQueues是个TreeSet按队列的capacity排序
            parentQueue.setChildQueues(childQueues);
        }
    
        if(queue instanceof LeafQueue == true && queues.containsKey(queueName)
           && queues.get(queueName) instanceof LeafQueue == true) {
            throw new IOException("Two leaf queues were named " + queueName
                                  + ". Leaf queue names must be distinct");
        }
        queues.put(queueName, queue);
    
        LOG.info("Initialized queue: " + queue);
        return queue;
    }
    

    ParentQueue构建

    public ParentQueue(CapacitySchedulerContext cs, 
                       String queueName, CSQueue parent, CSQueue old) throws IOException {
        super(cs, queueName, parent, old);
        //队列比较器 
        this.queueComparator = cs.getQueueComparator();
        //是否root队列
        this.rootQueue = (parent == null);
        //获取队列设置的capacity,参数:yarn.scheduler.capacity.root.rec.capacity,如果是root队列则直接是100
        float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
    
        if (rootQueue &&
            (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
            throw new IllegalArgumentException("Illegal " +
                                               "capacity of " + rawCapacity + " for queue " + queueName +
                                               ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
        }
    
        float capacity = (float) rawCapacity / 100;
        float parentAbsoluteCapacity = 
            (rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
        float absoluteCapacity = parentAbsoluteCapacity * capacity; 
    
        float  maximumCapacity =
            (float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
        float absoluteMaxCapacity = 
            CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
    
        QueueState state = cs.getConfiguration().getState(getQueuePath());
    
        Map<AccessType, AccessControlList> acls = 
            cs.getConfiguration().getAcls(getQueuePath());
    
        //设置conf参数
        setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
                          maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
                          defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels, 
                          cs.getConfiguration().getReservationContinueLook());
        //子队列按队列比较器来进行排序
        this.childQueues = new TreeSet<CSQueue>(queueComparator);
    
        LOG.debug("Initialized parent-queue " + queueName +
                  " name=" + queueName + 
                  ", fullname=" + getQueuePath()); 
    }
    

    LeafQueue构建

    public LeafQueue(CapacitySchedulerContext cs, 
                     String queueName, CSQueue parent, CSQueue old) throws IOException {
        super(cs, queueName, parent, old);
        this.scheduler = cs;
    
        this.activeUsersManager = new ActiveUsersManager(metrics);
        this.minimumAllocationFactor = 
            Resources.ratio(resourceCalculator, 
                            Resources.subtract(maximumAllocation, minimumAllocation), 
                            maximumAllocation);
    
        float capacity = getCapacityFromConf();
        float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
    
        float maximumCapacity = 
            (float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
        float absoluteMaxCapacity = 
            CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
    
        // Initially set to absoluteMax, will be updated to more accurate
        // max avail value during assignContainers
        absoluteMaxAvailCapacity = absoluteMaxCapacity;
    
        int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
        float userLimitFactor = 
            cs.getConfiguration().getUserLimitFactor(getQueuePath());
    
        int maxApplications =
            cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
        if (maxApplications < 0) {
            int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
            maxApplications = (int)(maxSystemApps * absoluteCapacity);
        }
        maxApplicationsPerUser = 
            (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
    
        float maxAMResourcePerQueuePercent = cs.getConfiguration()
            .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
        int maxActiveApplications = 
            CSQueueUtils.computeMaxActiveApplications(
            resourceCalculator,
            cs.getClusterResource(), this.minimumAllocation,
            maxAMResourcePerQueuePercent, absoluteMaxCapacity);
        this.maxActiveAppsUsingAbsCap = 
            CSQueueUtils.computeMaxActiveApplications(
            resourceCalculator,
            cs.getClusterResource(), this.minimumAllocation,
            maxAMResourcePerQueuePercent, absoluteCapacity);
        int maxActiveApplicationsPerUser =
            CSQueueUtils.computeMaxActiveApplicationsPerUser(
            maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
    
        QueueState state = cs.getConfiguration().getState(getQueuePath());
    
        Map<AccessType, AccessControlList> acls = 
            cs.getConfiguration().getAcls(getQueuePath());
        //set
        setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
                          maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
                          maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
                          maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
                          .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
                          defaultLabelExpression, this.capacitiyByNodeLabels,
                          this.maxCapacityByNodeLabels,
                          cs.getConfiguration().getReservationContinueLook());
    
        if(LOG.isDebugEnabled()) {
            LOG.debug("LeafQueue:" + " name=" + queueName
                      + ", fullname=" + getQueuePath());
        }
        //app比较器,按appID排序
        Comparator<FiCaSchedulerApp> applicationComparator = cs.getApplicationComparator();
        //pendingApp列表
        this.pendingApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
        //activeApp列表
        this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
    }
    

    ParentQueue和LeafQueue的一点说明:队列通过ParentQueue和LeafQueue构成了一颗树,app只能在LeafQueue上申请资源,LeafQueue保存了当前运行的app列表,app列表分两种:pendingApp列表和activeApp列表,两个列表都是按appID来排序的,即capacityScheduler在队列内部是按FIFO调度的。

    (2) 构建队列和用户/组的映射

    队列映射 initializeQueueMappings():将用户/组和队列映射起来

    private void initializeQueueMappings() throws IOException {
        //如果存在队列映射,它是否会覆盖用户指定的值? 管理员可以使用此项将作业放入与用户指定的队列不同的队列中.
        overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
        LOG.info("Initialized queue mappings, override: "
                 + overrideWithQueueMappings);
        // Get new user/group mappings
        //指定用户/组到特定队列的映射 yarn.scheduler.capacity.queue-mappings
        List<QueueMapping> newMappings = conf.getQueueMappings();
        //check if mappings refer to valid queues
        for (QueueMapping mapping : newMappings) {
            if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
                !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
                CSQueue queue = queues.get(mapping.queue);
                if (queue == null || !(queue instanceof LeafQueue)) {
                    throw new IOException(
                        "mapping contains invalid or non-leaf queue " + mapping.queue);
                }
            }
        }
        //apply the new mappings since they are valid
        mappings = newMappings;
        // initialize groups if mappings are present
        if (mappings.size() > 0) {
            groups = new Groups(conf);
        }
    }
    

    capacity相关重要类

    CSQueue 接口

    CSQueue代表CapacityScheduler中分层队列树中的一个节点,使用这个类来保存队列的信息

    ClusterNodeTracker

    帮助程序库:-跟踪所有群集SchedulerNodes的状态-提供方便的方法来过滤和排序节点

    ResourceCalculator

    进行资源比较和操作

    ParentQueue

    PlanQueue

    这表示由ReservationSystem管理的动态队列。从用户的角度来看,这等效于保留的LeafQueue,但从功能上讲,它是ParentQueue的子类。

    4 AMLivelinessMonitor

    在ResouceManager#RMActiveServices#serviceInit()方法中进行createAMLivelinessMonitor的创建

    AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
    addService(amLivelinessMonitor);
    rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
    
    AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
    addService(amFinishingMonitor);
    rmContext.setAMFinishingMonitor(amFinishingMonitor);
    

    创建了两个AMLivelinessMonitor实例:amLivelinessMonitor代表运行中的AM,amFinishingMonitor代表完成中的AM。

    进入createAMLivelinessMonitor()

    protected AMLivelinessMonitor createAMLivelinessMonitor() {
        return new AMLivelinessMonitor(this.rmDispatcher);
    }
    
    
    
    
    

    //进入new AMLivelinessMonitor

    //进入new AMLivelinessMonitor
    public AMLivelinessMonitor(Dispatcher d) {
        super("AMLivelinessMonitor", new SystemClock());
        this.dispatcher = d.getEventHandler();
    }
    
    public void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        //am过期时间,yarn.am.liveness-monitor.expiry-interval-ms,默认10分钟
        int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
                                      YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
        //设置am过期间隔为10分钟
        setExpireInterval(expireIntvl);
        //设置am监控间隔为3分钟
        setMonitorInterval(expireIntvl/3);
    }
    
    
    

    AbstractLivelinessMonitor.java

    @Override
    protected void serviceStart() throws Exception {
        assert !stopped : "starting when already stopped";
        //将存储的AM的时间重置为当前时间
        resetTimer();
        //创建一个监控AM的线程
        checkerThread = new Thread(new PingChecker());
        checkerThread.setName("Ping Checker");
        //启动监控线程,不停的监控AM是否过期了
        checkerThread.start();
        super.serviceStart();
    }
    

    来看看监控线程的工作

    private class PingChecker implements Runnable {
        @Override
        public void run() {
            while (!stopped && !Thread.currentThread().isInterrupted()) {
                synchronized (AbstractLivelinessMonitor.this) {
                    //running是个map,元素为<AM,reportTime>
                    Iterator<Map.Entry<O, Long>> iterator = running.entrySet().iterator();
    
                    //avoid calculating current time everytime in loop
                    long currentTime = clock.getTime();
                    //所有AM进行当前时间和上次心跳汇报时间的间隔比较,如果超过设置的过期时间没有汇报心跳则认为AM过期了,然后会发起过期流程
                    while (iterator.hasNext()) {
                        Map.Entry<O, Long> entry = iterator.next();
                        if (currentTime > entry.getValue() + expireInterval) {
                            iterator.remove();
                            //发起过期流程
                            expire(entry.getKey());
                            LOG.info("Expired:" + entry.getKey().toString() + 
                                     " Timed out after " + expireInterval/1000 + " secs");
                        }
                    }
                }
                try {
                    Thread.sleep(monitorInterval);
                } catch (InterruptedException e) {
                    LOG.info(getName() + " thread interrupted");
                    break;
                }
            }
        }
    }
    

    来看下过期流程:AMLivelinessMonitor.expire()

    @Override
    protected void expire(ApplicationAttemptId id) {
        dispatcher.handle(new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));
    }
    

    即向RMAppAttemptImpl发送RMAppAttemptEventType.EXPIRE事件。

    总结

    capacity 使用Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>()来记录队列,

    使用CSQueue保存队列的具体信息

    解析队列 的时候,使用递归来解析child队列,同时设置每个队列父亲队列,队列通过ParentQueue和LeafQueue构成了一颗树,app只能在LeafQueue上申请资源,LeafQueue保存了当前运行的app列表,app列表分两种:pendingApp列表和activeApp列表,两个列表都是按appID来排序的,即capacityScheduler在队列内部是按FIFO调度的。

    问题1:queue = hook.hook(parentQueue);

    问题2:addService()

    参考:https://blog.csdn.net/weixin_44630798/article/details/89005651

    相关文章

      网友评论

          本文标题:2020-02-20

          本文链接:https://www.haomeiwen.com/subject/tijxqhtx.html