1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将在[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(1)的基础上继续分析, 由于上文主要分析了
PriorityQueue
的结构, 并没有分析方法, 所以本文将结合实际的用法来对其方法进行分析.
源码位置: https://github.com/nicktming/kubernetes/blob/tming-v1.13/pkg/scheduler/internal/queue/scheduling_queue.go
分支: tming-v1.13 (基于v1.13版本)
2. 方法
由于
scheduling_queue
是提供给外部用的, 在各种informers
会用到, 具体的以后会写专门的博客分析. 这里我们就从/pkg/scheduler/factory/factory.go
中的NewConfigFactory
入手.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
...
c := &configFactory{
client: args.Client,
podLister: schedulerCache,
podQueue: internalqueue.NewSchedulingQueue(stopEverything),
nodeLister: args.NodeInformer.Lister(),
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
controllerLister: args.ReplicationControllerInformer.Lister(),
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
}
...
}
这里我们只需要注意到
configFactory
中的podQueue
属性是由internalqueue.NewSchedulingQueue(stopEverything)
生成的一个scheduling_queue
, 如果优先级开启的话就是PriorityQueue
, 没有开启就是FIFO
(此结构我会在client-go
源码分析系列分析).
那关于操作此
podQueue
属性的地方在哪里呢? 在各种Informers
中, 关于informer
在k8s
中是一个非常重要的部分, 我会在client-go
源码分析系列中分析.
2.1 PodInformer (scheduled pod cache)
// assignedPod selects pods that are assigned (scheduled and running).
func assignedPod(pod *v1.Pod) bool {
return len(pod.Spec.NodeName) != 0
}
// scheduled pod cache
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
这里有个
FilterFunc
是过滤那些已经调度成功的pod
, 已经调度成功的pod
才会进入到Handler
中的方法的.pod.Spec.NodeName
中有节点名称就表明已经调度成功了.
2.1.1 addPodToCache
func (c *configFactory) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", obj)
return
}
if err := c.schedulerCache.AddPod(pod); err != nil {
klog.Errorf("scheduler cache AddPod failed: %v", err)
}
c.podQueue.AssignedPodAdded(pod)
// NOTE: Updating equivalence cache of addPodToCache has been
// handled optimistically in: pkg/scheduler/scheduler.go#assume()
}
我们目前只关注
podQueue
部分, 所以可以看到每当调度成功的pod
加入到集群的时候, 就会调用AssignedPodAdded(pod)
方法.
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
for _, pod := range pods {
if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.delete(pod)
} else {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
}
}
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods {
affinity := up.Spec.Affinity
if affinity != nil && affinity.PodAffinity != nil {
terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
for _, term := range terms {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, up)
break
}
}
}
}
return podsToMove
}
首先看看
getUnschedulablePodsWithMatchingAffinityTerm
中可以看到该方法是从unschedulableQ
中与该pod
有任何一个亲和性term配置. 然后把这些pods
从unschedulableQ
移到activeQ
.
其实通俗一点理解就是: 每当有一个已经调度成功的
pod
, 就把那些在unschedulableQ
与该pod
有一些亲和性配置(只要有一个term满足即可)移到activeQ
.
理由也很简单, 因为这些
pod
先前调度的时候没有任何pod
亲和性(PodAffinity
)满足, 所以无法调度成功, 进而进入了unschedulableQ
, 然后可以满足的pod
在集群有了, 那这些之前无法满足的pod
当然要进入到activeQ
中再次尝试进行调度了.
2.1.2 UpdatePodInCache
/pkg/scheduler/factory/factory.go
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
...
c.podQueue.AssignedPodUpdated(newPod)
}
/pkg/scheduler/interal/queue/scheduling_queu.go
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}
道理跟
addPodToCache
一模一样.
2.1.3 deletePodFromCache
// /pkg/scheduler/factory/factory.go
func (c *configFactory) deletePodFromCache(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.Pod: %v", t)
return
}
...
c.podQueue.MoveAllToActiveQueue()
}
所以每当集群中有已经调度成功的
pod
删除时, 对应的podCache
都会调用MoveAllToActiveQueue()
方法
// /pkg/scheduler/interal/queue/scheduling_queu.go
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range p.unschedulableQ.pods {
if err := p.activeQ.Add(pod); err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
}
}
p.unschedulableQ.clear()
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}
MoveAllToActiveQueue
将所有的unschedulableQ
中的pods
全部移到activeQ
中.
// TODO(bsalamat): We should add a back-off mechanism here so that a high priority
// pod which is unschedulable does not go to the head of the queue frequently. For
// example in a cluster where a lot of pods being deleted, such a high priority
// pod can deprive other pods from getting scheduled.
注意: 这里的意思是如果集群中频繁的删除已经调度成功的
pods
, 从而也会导致频繁的从unschedulabingQ
移到activeQ
, 如果此时unschedulabingQ
中有个高优先级的pod
而且它始终无法被调度, 那activeQ
中的低优先级的pod
可能就很难从activeQ
出来.
2.1.4 总结
- 当有已经调度成功的
pod
添加或者更新时, 对应的podQueue
会把在unschedulabingQ
中与该pod
有任意一个亲和性term配置的pods
移到activeQ
中.- 当有已经调度成功的
pod
添加或者更新时, 对应的podQueue
会把所有unschedulabingQ
中的pods
移到activeQ
中. (因为有可能减少资源了, 所以有些因为资源不够无法调度的可以重新尝试了)
2.2 PodInformer (unscheduled pod queue)
可以看到该
PodInformer
针对的是那些没有被调度并且属于该调度器的pods
.
// pkg/scheduler/factory/factory.go
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
}
// unscheduled pod queue
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
2.2.1 addPodToSchedulingQueue
// pkg/scheduler/factory/factory.go
func (c *configFactory) addPodToSchedulingQueue(obj interface{}) {
if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil {
runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}
// pkg/scheduler/internal/queue/scheduling_queue
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.unschedulableQ.delete(pod)
}
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
}
关于
PriorityQueue
的Add
方法:
- 添加到activeQ中 如果存在就更新
- 如果unschedulableQ中有就删除 (因为一个pod只能存在于activeQ或unschedulableQ中)
- 存到nominatedPods中
注意: p.nominatedPods.add(pod, "")
- 如果该pod是抢占成功的pod, 那该pod就会存到该nominatedName节点中(因为在这里nodeName为空)
- 如果该pod不是抢占成功的pod, 那该pod的nominatedName也是空 加上这里nodeName也是空, 所以此方法什么也不做
- 说白了就是这个(p.nominatedPods.add(pod, ""))调用就是 抢占就存 没有抢占就不存
总结: 所以当有一个没有被调度并且属于该调度器的
pod
出现在集群时, 那么对应的podQueue
会将其加入到activeQ
中, 如果该pod
是那种抢占它人成功的pod
(就是pod.Status.NominatedNodeName
有值),则需要将pod
也加入到nominatedPods
中
2.2.2 updatePodInSchedulingQueue
// pkg/scheduler/factory/factory.go
func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
pod := newObj.(*v1.Pod)
if c.skipPodUpdate(pod) {
return
}
if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
}
// pkg/scheduler/internal/queue/scheduling_queue.go
// 1. 如果在actvieQ中存在, 所以直接更新activeQ和nominatedPods
// 2. 如果在unschedulableQ中存在, 更新nominatedPods
// 2.1 如果spec中发生了改变, 有可能成为schedulable, 因此加入到activeQ中并从unscheduableQ中删除
// 2.2 如果没有任何改变 直接更新unscheduableQ
// 3. 在activeQ和unschedulableQ中都不存在, 添加到activeQ和p.nominatedPods.add(newPod, "")
func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(newPod); exists {
p.nominatedPods.update(oldPod, newPod)
err := p.activeQ.Update(newPod)
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
p.nominatedPods.update(oldPod, newPod)
// 如果发生了改变, 所以就有可能变成schedulable, 因此加入到activeQ中并从unscheduableQ中删除
if isPodUpdated(oldPod, newPod) {
p.unschedulableQ.delete(usPod)
err := p.activeQ.Add(newPod)
if err == nil {
p.cond.Broadcast()
}
return err
}
// 如果没有改变 直接在unschedulabeQ中更新
p.unschedulableQ.addOrUpdate(newPod)
return nil
}
// If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(newPod)
if err == nil {
p.nominatedPods.add(newPod, "")
p.cond.Broadcast()
}
return err
}
func isPodUpdated(oldPod, newPod *v1.Pod) bool {
strip := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
p.ResourceVersion = ""
p.Generation = 0
p.Status = v1.PodStatus{}
return p
}
return !reflect.DeepEqual(strip(oldPod), strip(newPod))
}
PriorityQueue
中的Update
分三种情况处理.
1. 在activeQ
中存在, 直接更新activeQ和nominatedPods.
2. 在unscheduableQ
中存在, 先更新nominatedPods
, 因为它的update
方法是先删除oldPod
, 然后调用add(newPod, "")
, 这个之前就已经分析了, 判断是否有抢占现象出现.
因为是更新操作, 所以它又判断这个newPod
是不是有可能变成了可调度的, 所以才有了isPodUpdated
方法, 然后根据其情况进行相应的操作.
3. 在activeQ
和unscheduableQ
中都不存在, 就直接加入到activeQ
, 然后调用p.nominatedPods.add(newPod, "")
, 如果该pod
有抢占就加入到nominatedPods
中.
总结: 所以当有一个没有被调度并且属于该调度器的
pod
在集群里被更新时, 那么对应的podQueue
会更新nominatedPods
, 如果在activeQ
就更新activeQ
, 如果在unschedulableQ
中并且spec
文件没有变就更新unschedulableQ
, 反之就加入到activeQ
中
2.2.3 deletePodFromSchedulingQueue
// pkg/scheduler/factory/factory.go
func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return
}
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return
}
if err := c.podQueue.Delete(pod); err != nil {
runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
}
...
}
// pkg/scheduler/internal/queue/scheduling_queue.go
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
// 有就删 没有什么都不做
p.nominatedPods.delete(pod)
err := p.activeQ.Delete(pod)
if err != nil { // The item was probably not found in the activeQ.
p.unschedulableQ.delete(pod)
}
return nil
}
总结: 所以当有一个没有被调度并且属于该调度器的
pod
在集群里要被删除时, 那么对应的podQueue
会将其从nominatedPods
删除(有就删, 没有什么都不做), 并且从activeQ
或者unschedulableQ
中删除.
2.3 NodeInformer
args.NodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
)
明白了上面的原理之后, 后面的分析就会简单很多
func (c *configFactory) addNodeToCache(obj interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
// NOTE: add a new node does not affect existing predicates in equivalence cache
}
func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
...
// Only activate unschedulable pods if the node became more schedulable.
// We skip the node property comparison when there is no unschedulable pods in the queue
// to save processing cycles. We still trigger a move to active queue to cover the case
// that a pod being processed by the scheduler is determined unschedulable. We want this
// pod to be reevaluated when a change in the cluster happens.
if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
c.podQueue.MoveAllToActiveQueue()
}
}
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return true
}
if nodeAllocatableChanged(newNode, oldNode) {
return true
}
if nodeLabelsChanged(newNode, oldNode) {
return true
}
if nodeTaintsChanged(newNode, oldNode) {
return true
}
if nodeConditionsChanged(newNode, oldNode) {
return true
}
return false
}
注意: 关于
updateNodeInCache
中当unscheduableQ
中没有任何pods
也会调用MoveAllToActiveQueue
目前没有明白?
总结:
1. 当集群中有加入一个新节点时, 会把unscheduableQ
中所有的pods
移到activeQ
中.
2. 当集群中更新一个节点时,如果节点是因为nodeSpecUnschedulableChanged, nodeAllocatableChanged, nodeLabelsChanged, nodeTaintsChanged, nodeConditionsChanged发生改变,也会把unscheduableQ
中所有的pods
移到activeQ
中.**
3. 当集群中删除一个节点时, podQueue不做任何操作
2.4 PvInformer
args.PvInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: c.onPvAdd,
UpdateFunc: c.onPvUpdate,
DeleteFunc: c.onPvDelete,
},
)
func (c *configFactory) onPvAdd(obj interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvUpdate(old, new interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
}
总结:
1. 当集群中有加入或者更新一个pv
时, 会把unscheduableQ
中所有的pods
移到activeQ
中.
2. 当集群中删除一个pv
时, podQueue不做任何操作
2.5 PvcInformer
args.PvcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onPvcAdd,
UpdateFunc: c.onPvcUpdate,
DeleteFunc: c.onPvcDelete,
},
)
func (c *configFactory) onPvcAdd(obj interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvcUpdate(old, new interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
}
总结:
1. 当集群中有加入或者更新一个pvc
时, 会把unscheduableQ
中所有的pods
移到activeQ
中.
2. 当集群中删除一个pvc
时, podQueue不做任何操作
2.6 ServiceInformer
args.ServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onServiceAdd,
UpdateFunc: c.onServiceUpdate,
DeleteFunc: c.onServiceDelete,
},
)
func (c *configFactory) onServiceAdd(obj interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onServiceDelete(obj interface{}) {
...
c.podQueue.MoveAllToActiveQueue()
}
总结:
1. 当集群中有加入或者更新或者一个service
时, 会把unscheduableQ
中所有的pods
移到activeQ
中.
2.7 run方法
// run starts the goroutine to pump from unschedulableQ to activeQ
func (p *PriorityQueue) run() {
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*v1.Pod
currentTime := p.clock.Now()
for _, pod := range p.unschedulableQ.pods {
lastScheduleTime := podTimestamp(pod)
if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pod)
}
}
if len(podsToMove) > 0 {
p.movePodsToActiveQueue(podsToMove)
}
}
const unschedulableQTimeInterval = 60 * time.Second
run
方法会启动一个后台goroutine
, 每隔30
秒会将那些在unscheduableQ
中已经被调度过并且等待超过60
秒移到activeQ
中.
2.8 schedulingCycle 和 moveRequestCycle属性
这两个属性是用来控制如果调度出现错误的时候加入到
unschedulableQ
的时候
func (p *PriorityQueue) MoveAllToActiveQueue() {
...
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
...
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}
在
MoveAllToActiveQueue
和movePodsToActiveQueue
会把这两个值设置为相等.而在pop
的时候会schedulingCycle
会加1
.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.get(pod) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
p.unschedulableQ.addOrUpdate(pod)
p.nominatedPods.add(pod, "")
return nil
}
err := p.activeQ.Add(pod)
if err == nil {
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
}
1.当该
pod
已经在unschedulableQ
或者activeQ
中, 就直接返回.
2. 当
podSchedulingCycle <= p.moveRequestCycle
时, 无论该pod
是否是unschedulable
, 都加入到activeQ
中, 因为scheduling queue
在该pod
的那个调度周期之后或当前周期进行了move
操作, 所以需要新尝试一下, 比如之前这个pod
可能是因为亲和性等问题没有被调度成功.
3. 当
pod
在move
之后来加入的话, 说明目前没有任何变化, 如果该pod是被认为是无法调度的, 那当然还是需要加入到unschedulableQ
中, 因为如果加入到activeQ
中, 出队列后还是会无法调度.
网友评论