美文网首页Docker容器k8s那点事儿
[k8s源码分析][kube-scheduler]schedul

[k8s源码分析][kube-scheduler]schedul

作者: nicktming | 来源:发表于2019-10-14 07:43 被阅读0次

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

本文将在[k8s源码分析][kube-scheduler]scheduler之启动run(1)[k8s源码分析][kube-scheduler]scheduler之启动run(2) 的基础上进行分析, 一个pod是如何进行调度的.

源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)

2. 介绍

schedule.png

假设选择给定一个pod需要调度, 现在有3个节点, Machine1, Machine2Machine3, 经过预选阶段发现Machine2不能运行该pod, 因此而被剔除, 接下来经过优选阶段, 可以运行该podMachine1Machine3分别得分为8分和7分. 因此最终该调度器会选择一个得分最高的节点也就是Machine3.

3. scheduleOne

func (sched *Scheduler) scheduleOne() {
    pod := sched.config.NextPod()
    // pod could be nil when schedulerQueue is closed
    // 代表schedulerQueue已经关闭了 所以pod为nil
    // 如果schedulerQueue现在没有pod 那么NextPod会一直block在这里
    if pod == nil {
        return
    }
    // 如果该pod要被删除
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    // 当前时间
    start := time.Now()
    // 调度该pod
    // 如果成功 会返回一个节点名称 err为nil
    // 如果失败 会返回错误err
    suggestedHost, err := sched.schedule(pod)
    if err != nil {
        // 正常调度失败 进行抢占
        if fitError, ok := err.(*core.FitError); ok {
            preemptionStartTime := time.Now()
            // 抢占
            sched.preempt(pod, fitError)
            ...
        } else {
            klog.Errorf("error selecting node for pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        }
        return
    }
    ...
    assumedPod := pod.DeepCopy()
    ...
    // assume该pod
    err = sched.assume(assumedPod, suggestedHost)
    ...
    // 异步bind操作
    go func() {
        ...
        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: suggestedHost,
            },
        })
        ...
    }()
}

1. NextPod获得下一个要调度的pod. 就是从podQueue中的activeQ中取最前头的pod.
2. sched.schedule(pod)为该pod正常调度. 如果成功会返回节点名称进入3. 如果失败并且为FitError, 则进行抢占sched.preempt(pod, fitError)后返回.
3. sched.assume(assumedPod, suggestedHost)会调用assume方法为podasssume操作.
4. 启动一个goroutine为该pod进行binding, 就是向api-server发请求将该podpod.Status.nodeName设置为选中的节点名称.

scheduleOne.png

4. schedule 调度

因此本文将会分析sched.schedule(pod)调度方法.

func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
    host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    if err != nil {
        pod = pod.DeepCopy()
        sched.config.Error(pod, err)
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
        sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
            Type:          v1.PodScheduled,
            Status:        v1.ConditionFalse,
            LastProbeTime: metav1.Now(),
            Reason:        v1.PodReasonUnschedulable,
            Message:       err.Error(),
        })
        return "", err
    }
    return host, err
}

1. host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)调用genericSchedulerSchedule来进行调度.
2. 如果失败了, 主要有两个操作:
2.1: sched.config.Error(pod, err)会调用默认处理err的方法.在[k8s源码分析][kube-scheduler]scheduler之启动run(1)中生成sched的时候分析过如何生成的, 在pkg/scheduler/factory/factory.go

// pkg/scheduler/factory/factory.go

func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
...
podBackoff := util.CreateDefaultPodBackoff()
    return &Config{
        ...
        Error:           c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
        ...
    }, nil
...
}

func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) {
    return func(pod *v1.Pod, err error) {
        ...
        backoff.Gc()
        podSchedulingCycle := podQueue.SchedulingCycle()
        // Retry asynchronously.
        // Note that this is extremely rudimentary and we need a more real error handling path.
        go func() {
            defer runtime.HandleCrash()
            podID := types.NamespacedName{
                Namespace: pod.Namespace,
                Name:      pod.Name,
            }
            origPod := pod
            ...
            // Get the pod again; it may have changed/been scheduled already.
            getBackoff := initialGetBackoff
            for {
                pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
                if err == nil {
                    if len(pod.Spec.NodeName) == 0 {
                        podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle)
                    } else {
                        if c.volumeBinder != nil {
                            // Volume binder only wants to keep unassigned pods
                            c.volumeBinder.DeletePodBindings(pod)
                        }
                    }
                    break
                }
                ...
            }
        }()
    }
}

可以看到MakeDefaultErrorFunc会调用podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle). 该方法在[k8s源码分析][kube-scheduler]scheduler之启动run(2)中已经分析过.

2.2:api-server更新该podPodCondition.

5. genericScheduler的Schedule方法

现在到了调度的核心部分了, 就是调度器到底是如何根据预选方法和优选方法进行调度的.

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    ...
    nodes, err := nodeLister.List()
    if err != nil {
        return "", err
    }
    if len(nodes) == 0 {
        return "", ErrNoNodesAvailable
    }
    ...
    trace.Step("Computing predicates")
    startPredicateEvalTime := time.Now()
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return "", err
    }
    ...
    trace.Step("Prioritizing")
    startPriorityEvalTime := time.Now()
    ...

    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return "", err
    }
    ...
    trace.Step("Selecting host")
    return g.selectHost(priorityList)
}

主要就是三个阶段, 这个已经是总所周知的了:
1. 预选阶段, 调用findNodesThatFit方法得到哪些节点是可以调度该pod的.
2. 优选阶段, 调用PrioritizeNodes方法计算出预选阶段得到的那些节点的分数.
3. 选择节点, 调用selectHost从这些节点分数中选个最高分, 如果最高分有多了, 就任意选一个.

5.1 预选阶段

5.1.1 findNodesThatFit

预选阶段就是调用findNodesThatFit来进行预选的.

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
    var filtered []*v1.Node
    failedPredicateMap := FailedPredicateMap{}

    // 如果没有任何预选方法 直接返回
    if len(g.predicates) == 0 {
        filtered = nodes
    } else {
        // 所有的节点的个数
        allNodes := int32(g.cache.NodeTree().NumNodes)
        // 最多选numNodesToFind个节点
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)

        // Create filtered list with enough space to avoid growing it
        // and allow assigning.
        filtered = make([]*v1.Node, numNodesToFind)
        errs := errors.MessageCountMap{}
        var (
            predicateResultLock sync.Mutex
            filteredLen         int32
            equivClass          *equivalence.Class
        )

        ctx, cancel := context.WithCancel(context.Background())

        // We can use the same metadata producer for all nodes.
        meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)

        if g.equivalenceCache != nil {
            // getEquivalenceClassInfo will return immediately if no equivalence pod found
            equivClass = equivalence.NewClass(pod)
        }

        checkNode := func(i int) {
            var nodeCache *equivalence.NodeCache
            nodeName := g.cache.NodeTree().Next()
            if g.equivalenceCache != nil {
                nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
            }
            fits, failedPredicates, err := podFitsOnNode(
                pod,
                meta,
                g.cachedNodeInfoMap[nodeName],
                g.predicates,
                nodeCache,
                g.schedulingQueue,
                g.alwaysCheckAllPredicates,
                equivClass,
            )
            // 如果出现错误 直接返回
            if err != nil {
                predicateResultLock.Lock()
                errs[err.Error()]++
                predicateResultLock.Unlock()
                return
            }
            // 如果该节点适合 
            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
                    // 退出了
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    // 加入到filtered中
                    filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                }
            } else {
                // 不适合 就把该节点以及原因加入到failedPredicateMap中
                predicateResultLock.Lock()
                failedPredicateMap[nodeName] = failedPredicates
                predicateResultLock.Unlock()
            }
        }

        // Stops searching for more nodes once the configured number of feasible nodes
        // are found.
        // 启动16个goroutine同时计算
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

        filtered = filtered[:filteredLen]
        if len(errs) > 0 {
            return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
        }
    }

    // 对预选方法过滤出来的所有节点 再重新从extenders中一个个过滤
    if len(filtered) > 0 && len(g.extenders) != 0 {
        for _, extender := range g.extenders {
            if !extender.IsInterested(pod) {
                continue
            }
            filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
            // 如果出现失败 返回
            if err != nil {
                if extender.IsIgnorable() {
                    klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
                        extender, err)
                    continue
                } else {
                    return []*v1.Node{}, FailedPredicateMap{}, err
                }
            }

            for failedNodeName, failedMsg := range failedMap {
                // 如果failedPredicateMap中不存在 加入到failedPredicateMap中
                if _, found := failedPredicateMap[failedNodeName]; !found {
                    failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
                }
                failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
            }
            filtered = filteredList
            if len(filtered) == 0 {
                break
            }
        }
    }
    // 返回最终的filtered 适合的节点
    // failedPredicateMap 失败的节点以及原因
    return filtered, failedPredicateMap, nil
}

可以看到启动了16goroutine同时进行计算, 从而得到了哪些节点(filtered)可以运行该pod, 并且也得到了哪些节点(failedPredicateMap)不能该pod以及其原因, 第三个返回值是判断运行findNodesThatFit过程中是否有错误发生, 如果有错误发生会立即返回的.

可以看到这其中最重要的方法是podFitsOnNode用来判断该pod是否可以在某个节点上运行.

5.1.2 addNominatedPods

在理解podFitsOnNode之前, 需要先了解addNominatedPods是什么意思

func addNominatedPods(pod *v1.Pod, meta algorithm.PredicateMetadata,
    nodeInfo *schedulercache.NodeInfo, queue internalqueue.SchedulingQueue) (bool, algorithm.PredicateMetadata,
    *schedulercache.NodeInfo) {
    if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
        // This may happen only in tests.
        return false, meta, nodeInfo
    }
    nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
    // 如果该节点上没有任何nominated pods
    if nominatedPods == nil || len(nominatedPods) == 0 {
        return false, meta, nodeInfo
    }
    var metaOut algorithm.PredicateMetadata
    if meta != nil {
        metaOut = meta.ShallowCopy()
    }
    nodeInfoOut := nodeInfo.Clone()
    for _, p := range nominatedPods {
        // 如果nominated pods中有别的pod优先级大于等于该pod 
        if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID {
            nodeInfoOut.AddPod(p)
            if metaOut != nil {
                metaOut.AddPod(p, nodeInfoOut)
            }
        }
    }
    return true, metaOut, nodeInfoOut
}

因为nominated pods是将来不出意外会运行该节点上的pods, 因为此类pods是属于抢占类型的, 正常调度没有通过, 然后将一些优先级低的pod从该节点上剔除, 但是剔除需要一段时间, 等到这些pods剔除完了之后, 这个nominated pod才会真正调度到此节点.

所以addNominatedPods就是模拟将这些NominatedPods加入到此节点中, 所以就返回了新的nodeInfo(nodeInfoOut), 但是在这里也不是全部加进去, 只把那些优先级大于该podNominatedPods加进去.

第一个返回的参数是表示是否有pods加进去. 因为如果该节点的NominatedPods是空的, 当然是返回false.
第二个返回的参数是新的PredicateMetadata. (这些是copy出来, 不会改变原有的)
第三个返回的参数是新的nodeInfo.(这些是copy出来, 不会改变原有的)

5.1.3 podFitsOnNode
func podFitsOnNode(
    pod *v1.Pod,
    meta algorithm.PredicateMetadata,
    info *schedulercache.NodeInfo,
    predicateFuncs map[string]algorithm.FitPredicate,
    nodeCache *equivalence.NodeCache,
    queue internalqueue.SchedulingQueue,
    alwaysCheckAllPredicates bool,
    equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
    var (
        eCacheAvailable  bool
        failedPredicates []algorithm.PredicateFailureReason
    )

    podsAdded := false
    // We run predicates twice in some cases. If the node has greater or equal priority
    // nominated pods, we run them when those pods are added to meta and nodeInfo.
    // If all predicates succeed in this pass, we run them again when these
    // nominated pods are not added. This second pass is necessary because some
    // predicates such as inter-pod affinity may not pass without the nominated pods.
    // If there are no nominated pods for the node or if the first run of the
    // predicates fail, we don't run the second pass.
    // We consider only equal or higher priority pods in the first pass, because
    // those are the current "pod" must yield to them and not take a space opened
    // for running them. It is ok if the current "pod" take resources freed for
    // lower priority pods.
    // Requiring that the new pod is schedulable in both circumstances ensures that
    // we are making a conservative decision: predicates like resources and inter-pod
    // anti-affinity are more likely to fail when the nominated pods are treated
    // as running, while predicates like pod affinity are more likely to fail when
    // the nominated pods are treated as not running. We can't just assume the
    // nominated pods are running because they are not running right now and in fact,
    // they may end up getting scheduled to a different node.
    for i := 0; i < 2; i++ {
        metaToUse := meta
        nodeInfoToUse := info
        if i == 0 {
            podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
        } else if !podsAdded || len(failedPredicates) != 0 {
            break
        }
        // Bypass eCache if node has any nominated pods.
        // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
        // when pods are nominated or their nominations change.
        eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
        for predicateID, predicateKey := range predicates.Ordering() {
            var (
                fit     bool
                reasons []algorithm.PredicateFailureReason
                err     error
            )
            //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
            if predicate, exist := predicateFuncs[predicateKey]; exist {
                // 如果quivlance class cache 存在的话就可以用RunPredicate来运行
                if eCacheAvailable {
                    fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
                } else {
                    fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                }
                if err != nil {
                    return false, []algorithm.PredicateFailureReason{}, err
                }

                // 如果不适合 并且不需要运行完全部预选方法的话 就直接退出了
                if !fit {
                    // eCache is available and valid, and predicates result is unfit, record the fail reasons
                    failedPredicates = append(failedPredicates, reasons...)
                    // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
                    if !alwaysCheckAllPredicates {
                        klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
                            "evaluation is short circuited and there are chances " +
                            "of other predicates failing as well.")
                        break
                    }
                }
            }
        }
    }

    return len(failedPredicates) == 0, failedPredicates, nil
}

这里的逻辑比较简单, 就是把预选方法predicate运行一遍看看是否符合.
需要理解的是为什么需要运行两次:
1. 第一次运行的是此节点中带有优先级大于或等于的nominated pods.
2. 第二次运行的是此节点中不带有nominated pods.
如果第一次直接失败, 则直接返回.
如果该节点没有nomiated pods, 就运行一次即可.

相关文章

网友评论

    本文标题:[k8s源码分析][kube-scheduler]schedul

    本文链接:https://www.haomeiwen.com/subject/qmykmctx.html