- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将在[k8s源码分析][kube-scheduler]scheduler之启动run(1) 和 [k8s源码分析][kube-scheduler]scheduler之启动run(2) 的基础上进行分析, 一个
pod
是如何进行调度的.
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)
2. 介绍
schedule.png
假设选择给定一个
pod
需要调度, 现在有3个节点,Machine1
,Machine2
和Machine3
, 经过预选阶段发现Machine2
不能运行该pod
, 因此而被剔除, 接下来经过优选阶段, 可以运行该pod
的Machine1
和Machine3
分别得分为8
分和7
分. 因此最终该调度器会选择一个得分最高的节点也就是Machine3
.
3. scheduleOne
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
// 代表schedulerQueue已经关闭了 所以pod为nil
// 如果schedulerQueue现在没有pod 那么NextPod会一直block在这里
if pod == nil {
return
}
// 如果该pod要被删除
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}
klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
// Synchronously attempt to find a fit for the pod.
// 当前时间
start := time.Now()
// 调度该pod
// 如果成功 会返回一个节点名称 err为nil
// 如果失败 会返回错误err
suggestedHost, err := sched.schedule(pod)
if err != nil {
// 正常调度失败 进行抢占
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
// 抢占
sched.preempt(pod, fitError)
...
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}
...
assumedPod := pod.DeepCopy()
...
// assume该pod
err = sched.assume(assumedPod, suggestedHost)
...
// 异步bind操作
go func() {
...
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
...
}()
}
1.
NextPod
获得下一个要调度的pod
. 就是从podQueue
中的activeQ
中取最前头的pod
.
2.sched.schedule(pod)
为该pod
正常调度. 如果成功会返回节点名称进入3. 如果失败并且为FitError
, 则进行抢占sched.preempt(pod, fitError)
后返回.
3.sched.assume(assumedPod, suggestedHost)
会调用assume
方法为pod
做asssume
操作.
4. 启动一个goroutine
为该pod
进行binding
, 就是向api-server
发请求将该pod
的pod.Status.nodeName
设置为选中的节点名称.
scheduleOne.png
4. schedule 调度
因此本文将会分析
sched.schedule(pod)
调度方法.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
LastProbeTime: metav1.Now(),
Reason: v1.PodReasonUnschedulable,
Message: err.Error(),
})
return "", err
}
return host, err
}
1.
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
调用genericScheduler
的Schedule
来进行调度.
2. 如果失败了, 主要有两个操作:
2.1:sched.config.Error(pod, err)
会调用默认处理err
的方法.在[k8s源码分析][kube-scheduler]scheduler之启动run(1)中生成sched
的时候分析过如何生成的, 在pkg/scheduler/factory/factory.go
中
// pkg/scheduler/factory/factory.go
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
...
podBackoff := util.CreateDefaultPodBackoff()
return &Config{
...
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
...
}, nil
...
}
func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) {
return func(pod *v1.Pod, err error) {
...
backoff.Gc()
podSchedulingCycle := podQueue.SchedulingCycle()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
go func() {
defer runtime.HandleCrash()
podID := types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
origPod := pod
...
// Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff
for {
pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle)
} else {
if c.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
c.volumeBinder.DeletePodBindings(pod)
}
}
break
}
...
}
}()
}
}
可以看到
MakeDefaultErrorFunc
会调用podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle)
. 该方法在[k8s源码分析][kube-scheduler]scheduler之启动run(2)中已经分析过.
2.2: 向
api-server
更新该pod
的PodCondition
.
5. genericScheduler的Schedule方法
现在到了调度的核心部分了, 就是调度器到底是如何根据预选方法和优选方法进行调度的.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
...
nodes, err := nodeLister.List()
if err != nil {
return "", err
}
if len(nodes) == 0 {
return "", ErrNoNodesAvailable
}
...
trace.Step("Computing predicates")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil {
return "", err
}
...
trace.Step("Prioritizing")
startPriorityEvalTime := time.Now()
...
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return "", err
}
...
trace.Step("Selecting host")
return g.selectHost(priorityList)
}
主要就是三个阶段, 这个已经是总所周知的了:
1. 预选阶段, 调用findNodesThatFit
方法得到哪些节点是可以调度该pod
的.
2. 优选阶段, 调用PrioritizeNodes
方法计算出预选阶段得到的那些节点的分数.
3. 选择节点, 调用selectHost
从这些节点分数中选个最高分, 如果最高分有多了, 就任意选一个.
5.1 预选阶段
5.1.1 findNodesThatFit
预选阶段就是调用
findNodesThatFit
来进行预选的.
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
// 如果没有任何预选方法 直接返回
if len(g.predicates) == 0 {
filtered = nodes
} else {
// 所有的节点的个数
allNodes := int32(g.cache.NodeTree().NumNodes)
// 最多选numNodesToFind个节点
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)
errs := errors.MessageCountMap{}
var (
predicateResultLock sync.Mutex
filteredLen int32
equivClass *equivalence.Class
)
ctx, cancel := context.WithCancel(context.Background())
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivClass = equivalence.NewClass(pod)
}
checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
nodeName := g.cache.NodeTree().Next()
if g.equivalenceCache != nil {
nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
}
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
)
// 如果出现错误 直接返回
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
predicateResultLock.Unlock()
return
}
// 如果该节点适合
if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
// 退出了
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
// 加入到filtered中
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
} else {
// 不适合 就把该节点以及原因加入到failedPredicateMap中
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
// 启动16个goroutine同时计算
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
}
}
// 对预选方法过滤出来的所有节点 再重新从extenders中一个个过滤
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
// 如果出现失败 返回
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}
}
for failedNodeName, failedMsg := range failedMap {
// 如果failedPredicateMap中不存在 加入到failedPredicateMap中
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}
// 返回最终的filtered 适合的节点
// failedPredicateMap 失败的节点以及原因
return filtered, failedPredicateMap, nil
}
可以看到启动了16个
goroutine
同时进行计算, 从而得到了哪些节点(filtered
)可以运行该pod
, 并且也得到了哪些节点(failedPredicateMap
)不能该pod
以及其原因, 第三个返回值是判断运行findNodesThatFit
过程中是否有错误发生, 如果有错误发生会立即返回的.
可以看到这其中最重要的方法是
podFitsOnNode
用来判断该pod
是否可以在某个节点上运行.
5.1.2 addNominatedPods
在理解
podFitsOnNode
之前, 需要先了解addNominatedPods
是什么意思
func addNominatedPods(pod *v1.Pod, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, queue internalqueue.SchedulingQueue) (bool, algorithm.PredicateMetadata,
*schedulercache.NodeInfo) {
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen only in tests.
return false, meta, nodeInfo
}
nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
// 如果该节点上没有任何nominated pods
if nominatedPods == nil || len(nominatedPods) == 0 {
return false, meta, nodeInfo
}
var metaOut algorithm.PredicateMetadata
if meta != nil {
metaOut = meta.ShallowCopy()
}
nodeInfoOut := nodeInfo.Clone()
for _, p := range nominatedPods {
// 如果nominated pods中有别的pod优先级大于等于该pod
if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID {
nodeInfoOut.AddPod(p)
if metaOut != nil {
metaOut.AddPod(p, nodeInfoOut)
}
}
}
return true, metaOut, nodeInfoOut
}
因为
nominated pods
是将来不出意外会运行该节点上的pods
, 因为此类pods
是属于抢占类型的, 正常调度没有通过, 然后将一些优先级低的pod
从该节点上剔除, 但是剔除需要一段时间, 等到这些pods
剔除完了之后, 这个nominated pod
才会真正调度到此节点.
所以
addNominatedPods
就是模拟将这些NominatedPods
加入到此节点中, 所以就返回了新的nodeInfo
(nodeInfoOut), 但是在这里也不是全部加进去, 只把那些优先级大于该pod
的NominatedPods
加进去.
第一个返回的参数是表示是否有
pods
加进去. 因为如果该节点的NominatedPods
是空的, 当然是返回false
.
第二个返回的参数是新的PredicateMetadata
. (这些是copy
出来, 不会改变原有的)
第三个返回的参数是新的nodeInfo
.(这些是copy
出来, 不会改变原有的)
5.1.3 podFitsOnNode
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
nodeCache *equivalence.NodeCache,
queue internalqueue.SchedulingQueue,
alwaysCheckAllPredicates bool,
equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason
)
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
for predicateID, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []algorithm.PredicateFailureReason
err error
)
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
// 如果quivlance class cache 存在的话就可以用RunPredicate来运行
if eCacheAvailable {
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
} else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
}
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
// 如果不适合 并且不需要运行完全部预选方法的话 就直接退出了
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
if !alwaysCheckAllPredicates {
klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
"evaluation is short circuited and there are chances " +
"of other predicates failing as well.")
break
}
}
}
}
}
return len(failedPredicates) == 0, failedPredicates, nil
}
这里的逻辑比较简单, 就是把预选方法
predicate
运行一遍看看是否符合.
需要理解的是为什么需要运行两次:
1. 第一次运行的是此节点中带有优先级大于或等于的nominated pods
.
2. 第二次运行的是此节点中不带有nominated pods
.
如果第一次直接失败, 则直接返回.
如果该节点没有nomiated pods
, 就运行一次即可.
网友评论