5.4 binding
承接上文 [k8s源码分析][kube-scheduler]scheduler之调度之优选(priority)与抢占(preempt) 继续分析.
调度算法执行完成后,调度器就需要将Pod
对象的pod.status.nodeName
设置为获得的节点名称.
5.4.1 正常调度
// pkg/scheduler/scheduler.go
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
...
assumedPod := pod.DeepCopy()
...
// assume modifies `assumedPod` by setting NodeName=suggestedHost
// assume该pod
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
klog.Errorf("error assuming pod: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
// 异步bind操作
go func() {
...
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
klog.Errorf("error binding pod: %v", err)
metrics.PodScheduleErrors.Inc()
} else {
metrics.PodScheduleSuccesses.Inc()
}
}()
}
1. 调用
sched.assume(assumedPod, suggestedHost)
方法assume
该pod
.
2. 启动异步goroutine
调用sched.bind
进行绑定.
5.4.1.1 assume
// pkg/scheduler/scheduler.go
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
// 乐观认为在后台以goroutine运行的binding(向api-server发请求)会成功
// 即使不成功 也没有问题 bind会立马从schedcache立马删除该pod
assumed.Spec.NodeName = host
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
// 报告一个error 在Error方法中会重新尝试加入到unscheduleQ中
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
LastProbeTime: metav1.Now(),
Reason: "SchedulerError",
Message: err.Error(),
})
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
// 如果是一个nominated pod则从shceduling queue 中删除
if sched.config.SchedulingQueue != nil {
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
// Optimistically assume that the binding will succeed, so we need to invalidate affected
// predicates in equivalence cache.
// If the binding fails, these invalidated item will not break anything.
if sched.config.Ecache != nil {
sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
}
return nil
}
1. 往
SchedulerCache
中加入assumed pod
, 在加入到assumed pod
中, 该pod
会被加入到node
中.
2. 如果存在, 从scheduling_queue
的NominatedPod
中删除.
5.4.1.2 bind
这里启动的是一个
goroutine
, 目的是更新pod.status.nodeName
字段.
func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
bindingStart := time.Now()
// If binding succeeded then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err := sched.config.GetBinder(assumed).Bind(b)
// 无论bind成功还是失败 都是要进行FinishBinding的
// 如果成功 podInformer(scheduled pod cache)会得到一个scheduled pod, 所以会出发AddFunc 调用shedulerCache.Add方法
// 如果失败 后面会因为超时会被SchedulerCache删除
if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
}
//如果向api-server绑定失败的话
// 1. 把在assume方法中加入的assumed pod删除(ForgetPod)
// 2. 报告错误的同时会尝试加入到unschedulerQ中
if err != nil {
klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
LastProbeTime: metav1.Now(),
Reason: "BindingRejected",
})
return err
}
metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
return nil
}
1. 如果向
api-server
绑定失败的话
- 把在assume方法中加入的
assumed pod
删除(ForgetPod
)- 报告错误的同时会尝试加入到
unschedulerQ
中
2. 无论bind成功还是失败 都会进行
FinishBinding
- 如果成功
podInformer(scheduled pod cache)
会得到一个scheduled pod
, 所以会出发AddFunc
调用shedulerCache.Add
方法.- 如果失败 后面会因为超时会被
SchedulerCache
删除
5.5 抢占到节点后
上文中已经分析了抢占阶段的操作, 但是抢占到后做了什么?
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
// 如果pod优先级没有打开 或者 该调度器禁止操作
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
return "", nil
}
// 获得抢占者
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
// node 抢占者要运行的节点
// victims 该节点中要杀死的pods
// nominatedPodsToClear 需要杀死的nominated pods
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
metrics.PreemptionVictims.Set(float64(len(victims)))
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
}
var nodeName = ""
if node != nil {
nodeName = node.Name
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
// 将该pod加入scheduling_queue中的nominatedPods中
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// Make a call to update nominated node name of the pod on the API server.
// 将pod.Status.NominatedNodeName设置为该nodeName
// 向api-server发请求更新该pod的pod.Status.NominatedNodeName字段
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
// 如果更新失败 就从schedulingQueue中的nominatedPodMap删除
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
// 向api-server发请求将那些牺牲者删除
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
// Clearing nominated pods should happen outside of "if node != nil". Node could
// be nil when a pod with nominated node name is eligible to preempt again,
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
// 将那些需要删除的nominatedPods删除
// 这里为什么需要放到node != nil外面 我理解是因为当一个已经设置过nominated node name的pod(也就是一起发生抢占过)可以再次进行抢占时,
// 但是该pod没有抢占成功, 在这种情况下需要删除该pod的pod.Status.NominatedNodeName
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
这里需要注意的的是删除
nominatedPodsToClear
的时候是在node != nil
外面操作的.
当node != nil
的时候执行删除nominatedPodsToClear
很正常.
当
node == nil
的时候要删除nominatedPodsToClear
主要是基于下面的情况:
当一个已经设置过nominated node name
的pod
(也就是一起发生抢占过)可以再次进行抢占时, 但是该pod
没有抢占成功, 在这种情况下需要删除该pod
的pod.Status.NominatedNodeName
.
网友评论