美文网首页k8s那点事儿
[k8s源码分析][kube-scheduler]schedul

[k8s源码分析][kube-scheduler]schedul

作者: nicktming | 来源:发表于2019-10-14 21:50 被阅读0次

    5.4 binding

    承接上文 [k8s源码分析][kube-scheduler]scheduler之调度之优选(priority)与抢占(preempt) 继续分析.
    调度算法执行完成后,调度器就需要将Pod对象的 pod.status.nodeName设置为获得的节点名称.

    5.4.1 正常调度
    // pkg/scheduler/scheduler.go
    
    // scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
    func (sched *Scheduler) scheduleOne() {
        pod := sched.config.NextPod()
        ...
        assumedPod := pod.DeepCopy()
    
        ...
        // assume modifies `assumedPod` by setting NodeName=suggestedHost
        // assume该pod
        err = sched.assume(assumedPod, suggestedHost)
        if err != nil {
            klog.Errorf("error assuming pod: %v", err)
            metrics.PodScheduleErrors.Inc()
            return
        }
        // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
        // 异步bind操作
        go func() {
            ...
            err := sched.bind(assumedPod, &v1.Binding{
                ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
                Target: v1.ObjectReference{
                    Kind: "Node",
                    Name: suggestedHost,
                },
            })
            metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
            if err != nil {
                klog.Errorf("error binding pod: %v", err)
                metrics.PodScheduleErrors.Inc()
            } else {
                metrics.PodScheduleSuccesses.Inc()
            }
        }()
    }
    

    1. 调用sched.assume(assumedPod, suggestedHost)方法assumepod.
    2. 启动异步goroutine调用sched.bind进行绑定.

    5.4.1.1 assume
    // pkg/scheduler/scheduler.go
    
    func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
        // Optimistically assume that the binding will succeed and send it to apiserver
        // in the background.
        // If the binding fails, scheduler will release resources allocated to assumed pod
        // immediately.
    
        // 乐观认为在后台以goroutine运行的binding(向api-server发请求)会成功
        // 即使不成功 也没有问题 bind会立马从schedcache立马删除该pod
        assumed.Spec.NodeName = host
        // NOTE: Updates must be written to scheduler cache before invalidating
        // equivalence cache, because we could snapshot equivalence cache after the
        // invalidation and then snapshot the cache itself. If the cache is
        // snapshotted before updates are written, we would update equivalence
        // cache with stale information which is based on snapshot of old cache.
        if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
            klog.Errorf("scheduler cache AssumePod failed: %v", err)
    
            // This is most probably result of a BUG in retrying logic.
            // We report an error here so that pod scheduling can be retried.
            // This relies on the fact that Error will check if the pod has been bound
            // to a node and if so will not add it back to the unscheduled pods queue
            // (otherwise this would cause an infinite loop).
    // 报告一个error 在Error方法中会重新尝试加入到unscheduleQ中
            sched.config.Error(assumed, err)
            sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err)
            sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
                Type:          v1.PodScheduled,
                Status:        v1.ConditionFalse,
                LastProbeTime: metav1.Now(),
                Reason:        "SchedulerError",
                Message:       err.Error(),
            })
            return err
        }
        // if "assumed" is a nominated pod, we should remove it from internal cache
        // 如果是一个nominated pod则从shceduling queue 中删除
        if sched.config.SchedulingQueue != nil {
            sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
        }
    
        // Optimistically assume that the binding will succeed, so we need to invalidate affected
        // predicates in equivalence cache.
        // If the binding fails, these invalidated item will not break anything.
        if sched.config.Ecache != nil {
            sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
        }
        return nil
    }
    

    1.SchedulerCache中加入assumed pod, 在加入到assumed pod中, 该pod会被加入到node中.
    2. 如果存在, 从scheduling_queueNominatedPod中删除.

    5.4.1.2 bind

    这里启动的是一个goroutine, 目的是更新pod.status.nodeName字段.

    func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
        bindingStart := time.Now()
        // If binding succeeded then PodScheduled condition will be updated in apiserver so that
        // it's atomic with setting host.
        err := sched.config.GetBinder(assumed).Bind(b)
        // 无论bind成功还是失败 都是要进行FinishBinding的
        // 如果成功 podInformer(scheduled pod cache)会得到一个scheduled pod, 所以会出发AddFunc 调用shedulerCache.Add方法
        // 如果失败 后面会因为超时会被SchedulerCache删除
        if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
            klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
        }
        //如果向api-server绑定失败的话
        // 1. 把在assume方法中加入的assumed pod删除(ForgetPod)
        // 2. 报告错误的同时会尝试加入到unschedulerQ中
        if err != nil {
            klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
            if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
                klog.Errorf("scheduler cache ForgetPod failed: %v", err)
            }
            sched.config.Error(assumed, err)
            sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
            sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
                Type:          v1.PodScheduled,
                Status:        v1.ConditionFalse,
                LastProbeTime: metav1.Now(),
                Reason:        "BindingRejected",
            })
            return err
        }
    
        metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
        metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
        sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
        return nil
    }
    

    1. 如果向api-server绑定失败的话

    1. 把在assume方法中加入的assumed pod删除(ForgetPod)
    2. 报告错误的同时会尝试加入到unschedulerQ

    2. 无论bind成功还是失败 都会进行FinishBinding

    1. 如果成功podInformer(scheduled pod cache)会得到一个scheduled pod, 所以会出发AddFunc 调用shedulerCache.Add方法.
    2. 如果失败 后面会因为超时会被SchedulerCache删除

    5.5 抢占到节点后

    上文中已经分析了抢占阶段的操作, 但是抢占到后做了什么?

    func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
        // 如果pod优先级没有打开 或者 该调度器禁止操作
        if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
            klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
                " No preemption is performed.")
            return "", nil
        }
        // 获得抢占者
        preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
        if err != nil {
            klog.Errorf("Error getting the updated preemptor pod object: %v", err)
            return "", err
        }
    
        // node 抢占者要运行的节点
        // victims 该节点中要杀死的pods
        // nominatedPodsToClear 需要杀死的nominated pods
        node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
        metrics.PreemptionVictims.Set(float64(len(victims)))
        if err != nil {
            klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
            return "", err
        }
        var nodeName = ""
        if node != nil {
            nodeName = node.Name
            // Update the scheduling queue with the nominated pod information. Without
            // this, there would be a race condition between the next scheduling cycle
            // and the time the scheduler receives a Pod Update for the nominated pod.
    
            // 将该pod加入scheduling_queue中的nominatedPods中
            sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
    
            // Make a call to update nominated node name of the pod on the API server.
            // 将pod.Status.NominatedNodeName设置为该nodeName
            // 向api-server发请求更新该pod的pod.Status.NominatedNodeName字段
            err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
            if err != nil {
                klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
                // 如果更新失败 就从schedulingQueue中的nominatedPodMap删除
                sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
                return "", err
            }
    
            for _, victim := range victims {
                // 向api-server发请求将那些牺牲者删除
                if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
                    klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
                    return "", err
                }
                sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
            }
        }
        // Clearing nominated pods should happen outside of "if node != nil". Node could
        // be nil when a pod with nominated node name is eligible to preempt again,
        // but preemption logic does not find any node for it. In that case Preempt()
        // function of generic_scheduler.go returns the pod itself for removal of the annotation.
        // 将那些需要删除的nominatedPods删除
        // 这里为什么需要放到node != nil外面 我理解是因为当一个已经设置过nominated node name的pod(也就是一起发生抢占过)可以再次进行抢占时,
        // 但是该pod没有抢占成功, 在这种情况下需要删除该pod的pod.Status.NominatedNodeName
        for _, p := range nominatedPodsToClear {
            rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
            if rErr != nil {
                klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
                // We do not return as this error is not critical.
            }
        }
        return nodeName, err
    }
    

    这里需要注意的的是删除nominatedPodsToClear的时候是在node != nil外面操作的.
    node != nil的时候执行删除nominatedPodsToClear很正常.

    node == nil的时候要删除nominatedPodsToClear主要是基于下面的情况:
    当一个已经设置过nominated node namepod(也就是一起发生抢占过)可以再次进行抢占时, 但是该pod没有抢占成功, 在这种情况下需要删除该podpod.Status.NominatedNodeName.

    相关文章

      网友评论

        本文标题:[k8s源码分析][kube-scheduler]schedul

        本文链接:https://www.haomeiwen.com/subject/beldmctx.html