美文网首页k8s那点事儿
[k8s源码分析][kube-scheduler]schedul

[k8s源码分析][kube-scheduler]schedul

作者: nicktming | 来源:发表于2019-10-14 21:50 被阅读0次

5.4 binding

承接上文 [k8s源码分析][kube-scheduler]scheduler之调度之优选(priority)与抢占(preempt) 继续分析.
调度算法执行完成后,调度器就需要将Pod对象的 pod.status.nodeName设置为获得的节点名称.

5.4.1 正常调度
// pkg/scheduler/scheduler.go

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
    pod := sched.config.NextPod()
    ...
    assumedPod := pod.DeepCopy()

    ...
    // assume modifies `assumedPod` by setting NodeName=suggestedHost
    // assume该pod
    err = sched.assume(assumedPod, suggestedHost)
    if err != nil {
        klog.Errorf("error assuming pod: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }
    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    // 异步bind操作
    go func() {
        ...
        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: suggestedHost,
            },
        })
        metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
        if err != nil {
            klog.Errorf("error binding pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        } else {
            metrics.PodScheduleSuccesses.Inc()
        }
    }()
}

1. 调用sched.assume(assumedPod, suggestedHost)方法assumepod.
2. 启动异步goroutine调用sched.bind进行绑定.

5.4.1.1 assume
// pkg/scheduler/scheduler.go

func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // Optimistically assume that the binding will succeed and send it to apiserver
    // in the background.
    // If the binding fails, scheduler will release resources allocated to assumed pod
    // immediately.

    // 乐观认为在后台以goroutine运行的binding(向api-server发请求)会成功
    // 即使不成功 也没有问题 bind会立马从schedcache立马删除该pod
    assumed.Spec.NodeName = host
    // NOTE: Updates must be written to scheduler cache before invalidating
    // equivalence cache, because we could snapshot equivalence cache after the
    // invalidation and then snapshot the cache itself. If the cache is
    // snapshotted before updates are written, we would update equivalence
    // cache with stale information which is based on snapshot of old cache.
    if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
        klog.Errorf("scheduler cache AssumePod failed: %v", err)

        // This is most probably result of a BUG in retrying logic.
        // We report an error here so that pod scheduling can be retried.
        // This relies on the fact that Error will check if the pod has been bound
        // to a node and if so will not add it back to the unscheduled pods queue
        // (otherwise this would cause an infinite loop).
// 报告一个error 在Error方法中会重新尝试加入到unscheduleQ中
        sched.config.Error(assumed, err)
        sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err)
        sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
            Type:          v1.PodScheduled,
            Status:        v1.ConditionFalse,
            LastProbeTime: metav1.Now(),
            Reason:        "SchedulerError",
            Message:       err.Error(),
        })
        return err
    }
    // if "assumed" is a nominated pod, we should remove it from internal cache
    // 如果是一个nominated pod则从shceduling queue 中删除
    if sched.config.SchedulingQueue != nil {
        sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    }

    // Optimistically assume that the binding will succeed, so we need to invalidate affected
    // predicates in equivalence cache.
    // If the binding fails, these invalidated item will not break anything.
    if sched.config.Ecache != nil {
        sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
    }
    return nil
}

1.SchedulerCache中加入assumed pod, 在加入到assumed pod中, 该pod会被加入到node中.
2. 如果存在, 从scheduling_queueNominatedPod中删除.

5.4.1.2 bind

这里启动的是一个goroutine, 目的是更新pod.status.nodeName字段.

func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
    bindingStart := time.Now()
    // If binding succeeded then PodScheduled condition will be updated in apiserver so that
    // it's atomic with setting host.
    err := sched.config.GetBinder(assumed).Bind(b)
    // 无论bind成功还是失败 都是要进行FinishBinding的
    // 如果成功 podInformer(scheduled pod cache)会得到一个scheduled pod, 所以会出发AddFunc 调用shedulerCache.Add方法
    // 如果失败 后面会因为超时会被SchedulerCache删除
    if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
        klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
    }
    //如果向api-server绑定失败的话
    // 1. 把在assume方法中加入的assumed pod删除(ForgetPod)
    // 2. 报告错误的同时会尝试加入到unschedulerQ中
    if err != nil {
        klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
        if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
            klog.Errorf("scheduler cache ForgetPod failed: %v", err)
        }
        sched.config.Error(assumed, err)
        sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
        sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
            Type:          v1.PodScheduled,
            Status:        v1.ConditionFalse,
            LastProbeTime: metav1.Now(),
            Reason:        "BindingRejected",
        })
        return err
    }

    metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
    metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
    sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
    return nil
}

1. 如果向api-server绑定失败的话

  1. 把在assume方法中加入的assumed pod删除(ForgetPod)
  2. 报告错误的同时会尝试加入到unschedulerQ

2. 无论bind成功还是失败 都会进行FinishBinding

  1. 如果成功podInformer(scheduled pod cache)会得到一个scheduled pod, 所以会出发AddFunc 调用shedulerCache.Add方法.
  2. 如果失败 后面会因为超时会被SchedulerCache删除

5.5 抢占到节点后

上文中已经分析了抢占阶段的操作, 但是抢占到后做了什么?

func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
    // 如果pod优先级没有打开 或者 该调度器禁止操作
    if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
        klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
            " No preemption is performed.")
        return "", nil
    }
    // 获得抢占者
    preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
    if err != nil {
        klog.Errorf("Error getting the updated preemptor pod object: %v", err)
        return "", err
    }

    // node 抢占者要运行的节点
    // victims 该节点中要杀死的pods
    // nominatedPodsToClear 需要杀死的nominated pods
    node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
    metrics.PreemptionVictims.Set(float64(len(victims)))
    if err != nil {
        klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
        return "", err
    }
    var nodeName = ""
    if node != nil {
        nodeName = node.Name
        // Update the scheduling queue with the nominated pod information. Without
        // this, there would be a race condition between the next scheduling cycle
        // and the time the scheduler receives a Pod Update for the nominated pod.

        // 将该pod加入scheduling_queue中的nominatedPods中
        sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

        // Make a call to update nominated node name of the pod on the API server.
        // 将pod.Status.NominatedNodeName设置为该nodeName
        // 向api-server发请求更新该pod的pod.Status.NominatedNodeName字段
        err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
        if err != nil {
            klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
            // 如果更新失败 就从schedulingQueue中的nominatedPodMap删除
            sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
            return "", err
        }

        for _, victim := range victims {
            // 向api-server发请求将那些牺牲者删除
            if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
                klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
                return "", err
            }
            sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
        }
    }
    // Clearing nominated pods should happen outside of "if node != nil". Node could
    // be nil when a pod with nominated node name is eligible to preempt again,
    // but preemption logic does not find any node for it. In that case Preempt()
    // function of generic_scheduler.go returns the pod itself for removal of the annotation.
    // 将那些需要删除的nominatedPods删除
    // 这里为什么需要放到node != nil外面 我理解是因为当一个已经设置过nominated node name的pod(也就是一起发生抢占过)可以再次进行抢占时,
    // 但是该pod没有抢占成功, 在这种情况下需要删除该pod的pod.Status.NominatedNodeName
    for _, p := range nominatedPodsToClear {
        rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
        if rErr != nil {
            klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
            // We do not return as this error is not critical.
        }
    }
    return nodeName, err
}

这里需要注意的的是删除nominatedPodsToClear的时候是在node != nil外面操作的.
node != nil的时候执行删除nominatedPodsToClear很正常.

node == nil的时候要删除nominatedPodsToClear主要是基于下面的情况:
当一个已经设置过nominated node namepod(也就是一起发生抢占过)可以再次进行抢占时, 但是该pod没有抢占成功, 在这种情况下需要删除该podpod.Status.NominatedNodeName.

相关文章

网友评论

    本文标题:[k8s源码分析][kube-scheduler]schedul

    本文链接:https://www.haomeiwen.com/subject/beldmctx.html