深入分析kube-scheduler
从源代码角度分析kube-scheduler,重点分析Pod调度过程。
start
k8s.io\kubernetes\cmd\kube-scheduler\scheduler.go
func main() {
command := app.NewSchedulerCommand()
command.Execute()
}
k8s.io\kubernetes\cmd\kube-scheduler\app\server.go
func NewSchedulerCommand() *cobra.Command {
cmd := &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
Run(c.Complete(), stopCh)
},
}
}
老套路,利用spf13/cobra
库解析命令行参数,启动kube-scheduler服务,重点关注cmd.Run函数。
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
schedulerConfig, err := NewSchedulerConfig(c)
sched := scheduler.NewFromConfig(schedulerConfig)
sched.Run()
}
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{...}
sc, err := configurator.CreateFromConfig(*policy)
}
生成配置文件。ConfigFactoryArgs
结构体包括多个Informer
需要重点关注,因为事件处理函数都需要利用Informer
注册。
k8s.io\kubernetes\pkg\scheduler\factory\factory.go
// 给Informer注册各种事件处理函数
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
}
scheduling queue
每个scheduler都有一个queue,保存等待调度的pods;queue有两种实现:FIFO(默认) & PriorityQueue(优先级队列)
k8s.io\kubernetes\pkg\scheduler\core\scheduling_queue.go
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
}
k8s.io\client-go\tools\cache\fifo.go
type FIFO struct {
lock sync.RWMutex
items map[string]interface{}
queue []string
}
FIFO实现其实依赖了map和slice,利用map保存数据,slice保存数据索引顺序
unscheduled pod
注册事件处理函数
k8s.io\kubernetes\pkg\scheduler\factory\factory.go
// unscheduled pod queue
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return unassignedNonTerminatedPod(t) && responsibleForPod(t, args.SchedulerName)
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
判断函数
// 未指定Spec.NodeName, Pod状态不能是Succeeded或者Failed,即只能是Pending,Running,Unknown
func unassignedNonTerminatedPod(pod *v1.Pod) bool {
if len(pod.Spec.NodeName) != 0 {
return false
}
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return false
}
return true
}
// 校验schedulerName,常见于多scheduler场景
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
}
处理函数
func (c *configFactory) addPodToSchedulingQueue(obj interface{}) {
c.podQueue.Add(obj.(*v1.Pod))
}
func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
pod := newObj.(*v1.Pod)
if c.skipPodUpdate(pod) {
return
}
// 在FIFO中,update等同于add
c.podQueue.Update(oldObj.(*v1.Pod), pod)
}
func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
pod = obj.(*v1.Pod)
c.podQueue.Delete(pod)
}
schedule
调度流程:
- 获取要调度的Pod
- 根据Pod信息,选去合适的host调度
- 将Pod与host绑定在一起,post一个binding对象
k8s.io\kubernetes\pkg\scheduler\scheduler.go
func (sched *Scheduler) scheduleOne() {
// 从scheduling queue pop一个Pod
pod := sched.config.NextPod()
// schedule
suggestedHost, err := sched.schedule(pod)
// send post binding
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
}
}
k8s.io\kubernetes\pkg\scheduler\core\generic_scheduler.go
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
// 从cache中读取node list
nodes, err := nodeLister.List()
// 预选过程,选择合适的node
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
// 优选过程,对node打分
trace.Step("Prioritizing")
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
// 找到分数最高的host
trace.Step("Selecting host")
return g.selectHost(priorityList)
}
网友评论