kube-scheduler设计
kube-scheduler是以插件形式存在的组件,正因为以插件形式存在,所以其具有可扩展可定制的特性。kube-scheduler相当于整个集群的调度决策者,其通过预选和优选两个过程决定pod的最佳调度位置。
For given pod:
+---------------------------------------------+
| Schedulable nodes: |
| |
| +--------+ +--------+ +--------+ |
| | node 1 | | node 2 | | node 3 | |
| +--------+ +--------+ +--------+ |
| |
+-------------------+-------------------------+
|
|
v
+-------------------+-------------------------+
Pred. filters: node 3 doesn't have enough resource
+-------------------+-------------------------+
|
|
v
+-------------------+-------------------------+
| remaining nodes: |
| +--------+ +--------+ |
| | node 1 | | node 2 | |
| +--------+ +--------+ |
| |
+-------------------+-------------------------+
|
|
v
+-------------------+-------------------------+
Priority function: node 1: p=2
node 2: p=5
+-------------------+-------------------------+
|
|
v
select max{node priority} = node 2
kube-scheduler的目的就是为每一个pod选择一个合适的node,整体流程可以概括为三步:
- 首先通过Predicates策略过滤掉不能满足资源需求的node;
- 利用Priorities策略对筛选出的node进行打分排序;
- 将pod调度到得分最高的node上。
主要源码文件
- cmd/kube-scheduler/scheduler.go 组件入口,进行初始化和指令执行。
- pkg/scheduler/scheduler.go 调度算法的实现框架。
- pkg/scheduler/core/generic_scheduler.go 完成pod到node的绑定。
具体源码分析
组件入口
位置:cmd/kube-scheduler/scheduler.go
func main() {
rand.Seed(time.Now().UnixNano())
//构建command对象
command := app.NewSchedulerCommand()
......
//执行command
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
类似于所有的k8s组件启动流程,首先解析参数,构建command对象,然后执行command动作。
kube-scheduler 的默认参数在pkg/scheduler/algorithmprovider/defaults/defaults.go
中定义,指定默认的的Predicate、AlgorithmProvider以及Priority的策略。
构建command对象
位置:cmd/kube-scheduler/app/server.go
NewSchedulerCommand()
方法针对默认参数的schedule动作构建cobra.Command对象。
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
//指定command执行的逻辑是runCommand()函数
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
//读取一系列配置参数,注入到需要使用的对象中
fs := cmd.Flags()
namedFlagSets := opts.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
usageFmt := "Usage:\n %s\n"
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
})
cmd.MarkFlagFilename("config", "yaml", "yml", "json")
return cmd
}
command启动执行
位置:cmd/kube-scheduler/app/server.go
上一步构建command对象时指定了执行动作是runCommand(),而runCommand函数的本质是调用了server.go
的Run()。
Run()主要做了几件事情:
- 初始化scheduler对象;
- 启动 kube-scheduler server,提供健康检查和指标服务;
- 启动和资源相关的informer;
- 执行 sched.Run() 方法,执行调度逻辑。
// Run executes the scheduler based on the given configuration.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
...
// 1. 创建scheduler对象
sched, err := scheduler.New(
//根据相关config初始化对象
......
)
if err != nil {
return err
}
// 启动事件广播
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(ctx.Done())
}
if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
}
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
// 启动安全检查和资源指标服务
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// 启动Informer
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// 等待选举leader
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// 执行sched.Run()
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
网友评论