美文网首页Docker容器k8s那点事儿
[k8s源码分析][kube-scheduler]schedul

[k8s源码分析][kube-scheduler]schedul

作者: nicktming | 来源:发表于2019-10-11 07:28 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    本文将在[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(1)的基础上继续分析, 由于上文主要分析了PriorityQueue的结构, 并没有分析方法, 所以本文将结合实际的用法来对其方法进行分析.
    源码位置: https://github.com/nicktming/kubernetes/blob/tming-v1.13/pkg/scheduler/internal/queue/scheduling_queue.go
    分支: tming-v1.13 (基于v1.13版本)

    2. 方法

    由于scheduling_queue 是提供给外部用的, 在各种informers会用到, 具体的以后会写专门的博客分析. 这里我们就从/pkg/scheduler/factory/factory.go中的NewConfigFactory入手.

    func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
        stopEverything := args.StopCh
        if stopEverything == nil {
            stopEverything = wait.NeverStop
        }
        ...
        c := &configFactory{
            client:                         args.Client,
            podLister:                      schedulerCache,
            podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
            nodeLister:                     args.NodeInformer.Lister(),
            pVLister:                       args.PvInformer.Lister(),
            pVCLister:                      args.PvcInformer.Lister(),
            serviceLister:                  args.ServiceInformer.Lister(),
            controllerLister:               args.ReplicationControllerInformer.Lister(),
            replicaSetLister:               args.ReplicaSetInformer.Lister(),
            statefulSetLister:              args.StatefulSetInformer.Lister(),
            pdbLister:                      args.PdbInformer.Lister(),
            storageClassLister:             storageClassLister,
            schedulerCache:                 schedulerCache,
            StopEverything:                 stopEverything,
            schedulerName:                  args.SchedulerName,
            hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
            enableEquivalenceClassCache:    args.EnableEquivalenceClassCache,
            disablePreemption:              args.DisablePreemption,
            percentageOfNodesToScore:       args.PercentageOfNodesToScore,
        }
    ...
    }
    

    这里我们只需要注意到configFactory中的podQueue属性是由internalqueue.NewSchedulingQueue(stopEverything) 生成的一个scheduling_queue, 如果优先级开启的话就是PriorityQueue, 没有开启就是FIFO(此结构我会在client-go源码分析系列分析).

    那关于操作此podQueue属性的地方在哪里呢? 在各种Informers中, 关于informerk8s中是一个非常重要的部分, 我会在client-go源码分析系列中分析.

    2.1 PodInformer (scheduled pod cache)

    // assignedPod selects pods that are assigned (scheduled and running).
    func assignedPod(pod *v1.Pod) bool {
        return len(pod.Spec.NodeName) != 0
    }
    // scheduled pod cache
        args.PodInformer.Informer().AddEventHandler(
            cache.FilteringResourceEventHandler{
                FilterFunc: func(obj interface{}) bool {
                    switch t := obj.(type) {
                    case *v1.Pod:
                        return assignedPod(t)
                    case cache.DeletedFinalStateUnknown:
                        if pod, ok := t.Obj.(*v1.Pod); ok {
                            return assignedPod(pod)
                        }
                        runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                        return false
                    default:
                        runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                        return false
                    }
                },
                Handler: cache.ResourceEventHandlerFuncs{
                    AddFunc:    c.addPodToCache,
                    UpdateFunc: c.updatePodInCache,
                    DeleteFunc: c.deletePodFromCache,
                },
            },
        )
    

    这里有个FilterFunc是过滤那些已经调度成功的pod, 已经调度成功的pod才会进入到Handler中的方法的. pod.Spec.NodeName中有节点名称就表明已经调度成功了.

    2.1.1 addPodToCache
    func (c *configFactory) addPodToCache(obj interface{}) {
        pod, ok := obj.(*v1.Pod)
        if !ok {
            klog.Errorf("cannot convert to *v1.Pod: %v", obj)
            return
        }
    
        if err := c.schedulerCache.AddPod(pod); err != nil {
            klog.Errorf("scheduler cache AddPod failed: %v", err)
        }
    
        c.podQueue.AssignedPodAdded(pod)
    
        // NOTE: Updating equivalence cache of addPodToCache has been
        // handled optimistically in: pkg/scheduler/scheduler.go#assume()
    }
    

    我们目前只关注podQueue部分, 所以可以看到每当调度成功的pod加入到集群的时候, 就会调用AssignedPodAdded(pod)方法.

    func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
        p.lock.Lock()
        p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
        p.lock.Unlock()
    }
    // NOTE: this function assumes lock has been acquired in caller
    func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
        for _, pod := range pods {
            if err := p.activeQ.Add(pod); err == nil {
                p.unschedulableQ.delete(pod)
            } else {
                klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
            }
        }
        p.moveRequestCycle = p.schedulingCycle
        p.cond.Broadcast()
    }
    func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
        var podsToMove []*v1.Pod
        for _, up := range p.unschedulableQ.pods {
            affinity := up.Spec.Affinity
            if affinity != nil && affinity.PodAffinity != nil {
                terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
                for _, term := range terms {
                    namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term)
                    selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
                    if err != nil {
                        klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
                    }
                    if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
                        podsToMove = append(podsToMove, up)
                        break
                    }
                }
            }
        }
        return podsToMove
    }
    

    首先看看getUnschedulablePodsWithMatchingAffinityTerm中可以看到该方法是从unschedulableQ中与该pod有任何一个亲和性term配置. 然后把这些podsunschedulableQ移到activeQ.

    其实通俗一点理解就是: 每当有一个已经调度成功的pod, 就把那些在unschedulableQ与该pod有一些亲和性配置(只要有一个term满足即可)移到activeQ.

    理由也很简单, 因为这些pod先前调度的时候没有任何pod亲和性(PodAffinity)满足, 所以无法调度成功, 进而进入了unschedulableQ, 然后可以满足的pod在集群有了, 那这些之前无法满足的pod当然要进入到activeQ中再次尝试进行调度了.

    2.1.2 UpdatePodInCache
    /pkg/scheduler/factory/factory.go
    func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
        oldPod, ok := oldObj.(*v1.Pod)
        if !ok {
            klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
            return
        }
        newPod, ok := newObj.(*v1.Pod)
        ...
        c.podQueue.AssignedPodUpdated(newPod)
    }
    
    /pkg/scheduler/interal/queue/scheduling_queu.go
    func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
        p.lock.Lock()
        p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
        p.lock.Unlock()
    }
    

    道理跟addPodToCache一模一样.

    2.1.3 deletePodFromCache
    // /pkg/scheduler/factory/factory.go
    func (c *configFactory) deletePodFromCache(obj interface{}) {
        var pod *v1.Pod
        switch t := obj.(type) {
        case *v1.Pod:
            pod = t
        case cache.DeletedFinalStateUnknown:
            var ok bool
            pod, ok = t.Obj.(*v1.Pod)
            if !ok {
                klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
                return
            }
        default:
            klog.Errorf("cannot convert to *v1.Pod: %v", t)
            return
        }
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    

    所以每当集群中有已经调度成功的pod删除时, 对应的podCache都会调用MoveAllToActiveQueue()
    方法

    // /pkg/scheduler/interal/queue/scheduling_queu.go
    func (p *PriorityQueue) MoveAllToActiveQueue() {
        p.lock.Lock()
        defer p.lock.Unlock()
        for _, pod := range p.unschedulableQ.pods {
            if err := p.activeQ.Add(pod); err != nil {
                klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
            }
        }
        p.unschedulableQ.clear()
        p.moveRequestCycle = p.schedulingCycle
        p.cond.Broadcast()
    }
    

    MoveAllToActiveQueue 将所有的unschedulableQ中的pods全部移到activeQ中.

    // TODO(bsalamat): We should add a back-off mechanism here so that a high priority
    // pod which is unschedulable does not go to the head of the queue frequently. For
    // example in a cluster where a lot of pods being deleted, such a high priority
    // pod can deprive other pods from getting scheduled.
    

    注意: 这里的意思是如果集群中频繁的删除已经调度成功的pods, 从而也会导致频繁的从unschedulabingQ移到activeQ, 如果此时unschedulabingQ中有个高优先级的pod而且它始终无法被调度, 那activeQ中的低优先级的pod可能就很难从activeQ出来.

    2.1.4 总结
    1. 当有已经调度成功的pod添加或者更新时, 对应的podQueue会把在unschedulabingQ中与该pod有任意一个亲和性term配置的pods移到activeQ中.
    2. 当有已经调度成功的pod添加或者更新时, 对应的podQueue会把所有unschedulabingQ中的pods移到activeQ中. (因为有可能减少资源了, 所以有些因为资源不够无法调度的可以重新尝试了)

    2.2 PodInformer (unscheduled pod queue)

    可以看到该PodInformer针对的是那些没有被调度并且属于该调度器的pods.

    // pkg/scheduler/factory/factory.go
    
    // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
    func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
        return schedulerName == pod.Spec.SchedulerName
    }
    // unscheduled pod queue
        args.PodInformer.Informer().AddEventHandler(
            cache.FilteringResourceEventHandler{
                FilterFunc: func(obj interface{}) bool {
                    switch t := obj.(type) {
                    case *v1.Pod:
                        return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
                    case cache.DeletedFinalStateUnknown:
                        if pod, ok := t.Obj.(*v1.Pod); ok {
                            return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
                        }
                        runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                        return false
                    default:
                        runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                        return false
                    }
                },
                Handler: cache.ResourceEventHandlerFuncs{
                    AddFunc:    c.addPodToSchedulingQueue,
                    UpdateFunc: c.updatePodInSchedulingQueue,
                    DeleteFunc: c.deletePodFromSchedulingQueue,
                },
            },
        )
    
    2.2.1 addPodToSchedulingQueue
    // pkg/scheduler/factory/factory.go
    func (c *configFactory) addPodToSchedulingQueue(obj interface{}) {
        if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil {
            runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
        }
    }
    
    // pkg/scheduler/internal/queue/scheduling_queue
    func (p *PriorityQueue) Add(pod *v1.Pod) error {
        p.lock.Lock()
        defer p.lock.Unlock()
        err := p.activeQ.Add(pod)
        if err != nil {
            klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
        } else {
            if p.unschedulableQ.get(pod) != nil {
                klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
                p.unschedulableQ.delete(pod)
            }
            p.nominatedPods.add(pod, "")
            p.cond.Broadcast()
        }
        return err
    }
    

    关于PriorityQueueAdd方法:

    1. 添加到activeQ中 如果存在就更新
    2. 如果unschedulableQ中有就删除 (因为一个pod只能存在于activeQ或unschedulableQ中)
    3. 存到nominatedPods中

    注意: p.nominatedPods.add(pod, "")

    1. 如果该pod是抢占成功的pod, 那该pod就会存到该nominatedName节点中(因为在这里nodeName为空)
    2. 如果该pod不是抢占成功的pod, 那该pod的nominatedName也是空 加上这里nodeName也是空, 所以此方法什么也不做
    3. 说白了就是这个(p.nominatedPods.add(pod, ""))调用就是 抢占就存 没有抢占就不存

    总结: 所以当有一个没有被调度并且属于该调度器的pod出现在集群时, 那么对应的podQueue会将其加入到activeQ中, 如果该pod是那种抢占它人成功的pod(就是pod.Status.NominatedNodeName有值),则需要将pod也加入到nominatedPods

    2.2.2 updatePodInSchedulingQueue
    // pkg/scheduler/factory/factory.go
    func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
        pod := newObj.(*v1.Pod)
        if c.skipPodUpdate(pod) {
            return
        }
        if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
            runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
        }
    }
    
    // pkg/scheduler/internal/queue/scheduling_queue.go
    
    // 1. 如果在actvieQ中存在, 所以直接更新activeQ和nominatedPods
    // 2. 如果在unschedulableQ中存在, 更新nominatedPods
    //    2.1 如果spec中发生了改变, 有可能成为schedulable, 因此加入到activeQ中并从unscheduableQ中删除
    //    2.2 如果没有任何改变 直接更新unscheduableQ
    // 3. 在activeQ和unschedulableQ中都不存在, 添加到activeQ和p.nominatedPods.add(newPod, "")
    
    func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
        p.lock.Lock()
        defer p.lock.Unlock()
        // If the pod is already in the active queue, just update it there.
        if _, exists, _ := p.activeQ.Get(newPod); exists {
            p.nominatedPods.update(oldPod, newPod)
            err := p.activeQ.Update(newPod)
            return err
        }
        // If the pod is in the unschedulable queue, updating it may make it schedulable.
        if usPod := p.unschedulableQ.get(newPod); usPod != nil {
            p.nominatedPods.update(oldPod, newPod)
            // 如果发生了改变, 所以就有可能变成schedulable, 因此加入到activeQ中并从unscheduableQ中删除
            if isPodUpdated(oldPod, newPod) {
                p.unschedulableQ.delete(usPod)
                err := p.activeQ.Add(newPod)
                if err == nil {
                    p.cond.Broadcast()
                }
                return err
            }
            // 如果没有改变 直接在unschedulabeQ中更新
            p.unschedulableQ.addOrUpdate(newPod)
            return nil
        }
        // If pod is not in any of the two queue, we put it in the active queue.
        err := p.activeQ.Add(newPod)
        if err == nil {
            p.nominatedPods.add(newPod, "")
            p.cond.Broadcast()
        }
        return err
    }
    func isPodUpdated(oldPod, newPod *v1.Pod) bool {
        strip := func(pod *v1.Pod) *v1.Pod {
            p := pod.DeepCopy()
            p.ResourceVersion = ""
            p.Generation = 0
            p.Status = v1.PodStatus{}
            return p
        }
        return !reflect.DeepEqual(strip(oldPod), strip(newPod))
    }
    

    PriorityQueue中的Update分三种情况处理.
    1.activeQ中存在, 直接更新activeQ和nominatedPods.
    2.unscheduableQ中存在, 先更新nominatedPods, 因为它的update方法是先删除oldPod, 然后调用add(newPod, ""), 这个之前就已经分析了, 判断是否有抢占现象出现.
    因为是更新操作, 所以它又判断这个newPod是不是有可能变成了可调度的, 所以才有了isPodUpdated方法, 然后根据其情况进行相应的操作.
    3.activeQunscheduableQ中都不存在, 就直接加入到activeQ, 然后调用p.nominatedPods.add(newPod, ""), 如果该pod有抢占就加入到nominatedPods中.

    总结: 所以当有一个没有被调度并且属于该调度器的pod在集群里被更新时, 那么对应的podQueue会更新nominatedPods, 如果在activeQ就更新activeQ, 如果在unschedulableQ中并且spec文件没有变就更新unschedulableQ, 反之就加入到activeQ

    2.2.3 deletePodFromSchedulingQueue
    // pkg/scheduler/factory/factory.go
    func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
        var pod *v1.Pod
        switch t := obj.(type) {
        case *v1.Pod:
            pod = obj.(*v1.Pod)
        case cache.DeletedFinalStateUnknown:
            var ok bool
            pod, ok = t.Obj.(*v1.Pod)
            if !ok {
                runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                return
            }
        default:
            runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
            return
        }
        if err := c.podQueue.Delete(pod); err != nil {
            runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
        }
        ...
    }
    
    // pkg/scheduler/internal/queue/scheduling_queue.go
    func (p *PriorityQueue) Delete(pod *v1.Pod) error {
        p.lock.Lock()
        defer p.lock.Unlock()
         // 有就删 没有什么都不做
        p.nominatedPods.delete(pod)
        err := p.activeQ.Delete(pod)
        if err != nil { // The item was probably not found in the activeQ.
            p.unschedulableQ.delete(pod)
        }
        return nil
    }
    

    总结: 所以当有一个没有被调度并且属于该调度器的pod在集群里要被删除时, 那么对应的podQueue会将其从nominatedPods删除(有就删, 没有什么都不做), 并且从activeQ或者unschedulableQ中删除.

    2.3 NodeInformer

    args.NodeInformer.Informer().AddEventHandler(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addNodeToCache,
                UpdateFunc: c.updateNodeInCache,
                DeleteFunc: c.deleteNodeFromCache,
            },
        )
    

    明白了上面的原理之后, 后面的分析就会简单很多

    func (c *configFactory) addNodeToCache(obj interface{}) {
           ...
        c.podQueue.MoveAllToActiveQueue()
        // NOTE: add a new node does not affect existing predicates in equivalence cache
    }
    func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
        ...
        // Only activate unschedulable pods if the node became more schedulable.
        // We skip the node property comparison when there is no unschedulable pods in the queue
        // to save processing cycles. We still trigger a move to active queue to cover the case
        // that a pod being processed by the scheduler is determined unschedulable. We want this
        // pod to be reevaluated when a change in the cluster happens.
        if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
            c.podQueue.MoveAllToActiveQueue()
        }
    }
    func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
        if nodeSpecUnschedulableChanged(newNode, oldNode) {
            return true
        }
        if nodeAllocatableChanged(newNode, oldNode) {
            return true
        }
        if nodeLabelsChanged(newNode, oldNode) {
            return true
        }
        if nodeTaintsChanged(newNode, oldNode) {
            return true
        }
        if nodeConditionsChanged(newNode, oldNode) {
            return true
        }
        return false
    }
    

    注意: 关于updateNodeInCache中当unscheduableQ中没有任何pods也会调用MoveAllToActiveQueue目前没有明白?

    总结:
    1. 当集群中有加入一个新节点时, 会把unscheduableQ中所有的pods移到activeQ中.
    2. 当集群中更新一个节点时,如果节点是因为nodeSpecUnschedulableChanged, nodeAllocatableChanged, nodeLabelsChanged, nodeTaintsChanged, nodeConditionsChanged发生改变,也会把unscheduableQ中所有的pods移到activeQ中.**
    3. 当集群中删除一个节点时, podQueue不做任何操作

    2.4 PvInformer

    args.PvInformer.Informer().AddEventHandler(
            cache.ResourceEventHandlerFuncs{
                // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
                AddFunc:    c.onPvAdd,
                UpdateFunc: c.onPvUpdate,
                DeleteFunc: c.onPvDelete,
            },
        )
    func (c *configFactory) onPvAdd(obj interface{}) {
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    func (c *configFactory) onPvUpdate(old, new interface{}) {
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    

    总结:
    1. 当集群中有加入或者更新一个pv时, 会把unscheduableQ中所有的pods移到activeQ中.
    2. 当集群中删除一个pv时, podQueue不做任何操作

    2.5 PvcInformer

    args.PvcInformer.Informer().AddEventHandler(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    c.onPvcAdd,
                UpdateFunc: c.onPvcUpdate,
                DeleteFunc: c.onPvcDelete,
            },
        )
    func (c *configFactory) onPvcAdd(obj interface{}) {
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    
    func (c *configFactory) onPvcUpdate(old, new interface{}) {
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    

    总结:
    1. 当集群中有加入或者更新一个pvc时, 会把unscheduableQ中所有的pods移到activeQ中.
    2. 当集群中删除一个pvc时, podQueue不做任何操作

    2.6 ServiceInformer

    args.ServiceInformer.Informer().AddEventHandler(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    c.onServiceAdd,
                UpdateFunc: c.onServiceUpdate,
                DeleteFunc: c.onServiceDelete,
            },
        )
    func (c *configFactory) onServiceAdd(obj interface{}) {
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    
    func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    
    func (c *configFactory) onServiceDelete(obj interface{}) {
        ...
        c.podQueue.MoveAllToActiveQueue()
    }
    

    总结:
    1. 当集群中有加入或者更新或者一个service时, 会把unscheduableQ中所有的pods移到activeQ中.

    2.7 run方法

    // run starts the goroutine to pump from unschedulableQ to activeQ
    func (p *PriorityQueue) run() {
        go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
    }
    func (p *PriorityQueue) flushUnschedulableQLeftover() {
        p.lock.Lock()
        defer p.lock.Unlock()
    
        var podsToMove []*v1.Pod
        currentTime := p.clock.Now()
        for _, pod := range p.unschedulableQ.pods {
            lastScheduleTime := podTimestamp(pod)
            if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
                podsToMove = append(podsToMove, pod)
            }
        }
    
        if len(podsToMove) > 0 {
            p.movePodsToActiveQueue(podsToMove)
        }
    }
    const unschedulableQTimeInterval = 60 * time.Second
    

    run方法会启动一个后台goroutine, 每隔30秒会将那些在unscheduableQ中已经被调度过并且等待超过60秒移到activeQ中.

    2.8 schedulingCycle 和 moveRequestCycle属性

    这两个属性是用来控制如果调度出现错误的时候加入到unschedulableQ的时候

    func (p *PriorityQueue) MoveAllToActiveQueue() {
        ...
        p.moveRequestCycle = p.schedulingCycle
        p.cond.Broadcast()
    }
    
    // NOTE: this function assumes lock has been acquired in caller
    func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
        ...
        p.moveRequestCycle = p.schedulingCycle
        p.cond.Broadcast()
    }
    

    MoveAllToActiveQueuemovePodsToActiveQueue会把这两个值设置为相等.而在pop的时候会schedulingCycle会加1.

    func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
        p.lock.Lock()
        defer p.lock.Unlock()
        if p.unschedulableQ.get(pod) != nil {
            return fmt.Errorf("pod is already present in unschedulableQ")
        }
        if _, exists, _ := p.activeQ.Get(pod); exists {
            return fmt.Errorf("pod is already present in the activeQ")
        }
        if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
            p.unschedulableQ.addOrUpdate(pod)
            p.nominatedPods.add(pod, "")
            return nil
        }
        err := p.activeQ.Add(pod)
        if err == nil {
            p.nominatedPods.add(pod, "")
            p.cond.Broadcast()
        }
        return err
    }
    

    1.当该pod已经在unschedulableQ或者activeQ中, 就直接返回.

    2. 当podSchedulingCycle <= p.moveRequestCycle时, 无论该pod是否是unschedulable, 都加入到activeQ中, 因为scheduling queue在该pod的那个调度周期之后或当前周期进行了move操作, 所以需要新尝试一下, 比如之前这个pod可能是因为亲和性等问题没有被调度成功.

    3. 当podmove之后来加入的话, 说明目前没有任何变化, 如果该pod是被认为是无法调度的, 那当然还是需要加入到unschedulableQ中, 因为如果加入到activeQ中, 出队列后还是会无法调度.

    相关文章

      网友评论

        本文标题:[k8s源码分析][kube-scheduler]schedul

        本文链接:https://www.haomeiwen.com/subject/ycqzpctx.html