美文网首页
深入分析kube-batch(1)——启动过程

深入分析kube-batch(1)——启动过程

作者: 陈先生_9e91 | 来源:发表于2018-10-13 11:21 被阅读0次

    深入分析kube-batch(1)——启动

    从源码角度深入分析kube-batch

    code

    init,注册初始化

    kube-batch\pkg\scheduler\factory.go

    func init() {
        // Plugins for Jobs
        framework.RegisterPluginBuilder("drf", drf.New)
        framework.RegisterPluginBuilder("gang", gang.New)
        framework.RegisterPluginBuilder("predicates", predicates.New)
        framework.RegisterPluginBuilder("priority", priority.New)
    
        // Plugins for Queues
        framework.RegisterPluginBuilder("proportion", proportion.New)
    
        // Actions
        framework.RegisterAction(reclaim.New())
        framework.RegisterAction(allocate.New())
        framework.RegisterAction(preempt.New())
    }
    

    这里注册了五个Plugins以及三个Actions,可以想象整个调度过程都依赖这些组件去完成。而到底用哪些组件,就依赖配置文件了。

    kube-batch\pkg\scheduler\util.go

    var defaultSchedulerConf = map[string]string{
        "actions":                   "reclaim, allocate, preempt",
        "plugins":                   "gang, priority, drf, predicates, proportion",
        "plugin.gang.jobready":      "true",
        "plugin.gang.joborder":      "true",
        "plugin.gang.preemptable":   "true",
        "plugin.priority.joborder":  "true",
        "plugin.priority.taskorder": "true",
        "plugin.drf.preemptable":    "true",
        "plugin.drf.joborder":       "true",
    }
    

    默认这些组件都用上了。

    kube-batch\pkg\scheduler\scheduler.go

    func (pc *Scheduler) Run(stopCh <-chan struct{}) {
        // Start cache for policy.
        go pc.cache.Run(stopCh)
    
        // Load configuration of scheduler
        conf := defaultSchedulerConf
        if len(pc.schedulerConf) != 0 {
            conf, err = pc.cache.LoadSchedulerConf(pc.schedulerConf)
        }
    
        pc.actions, pc.pluginArgs = loadSchedulerConf(conf)
    
        go wait.Until(pc.runOnce, 1*time.Second, stopCh)
    }
    
    func (pc *Scheduler) runOnce() {
        glog.V(4).Infof("Start scheduling ...")
        defer glog.V(4).Infof("End scheduling ...")
    
        ssn := framework.OpenSession(pc.cache, pc.pluginArgs)
        defer framework.CloseSession(ssn)
    
        for _, action := range pc.actions {
            action.Execute(ssn)
        }
    }
    

    整个调度过程如上:

    1. 启动cache,watch一些RESTs
    2. 加载配置文件
    3. 间隔一秒,执行调度,开启会话
    4. 执行action

    下面分别介绍以上过程

    cache

    kube-batch\pkg\scheduler\cache\interface.go

    // Cache collects pods/nodes/queues information
    // and provides information snapshot
    type Cache interface {
        // Run start informer
        Run(stopCh <-chan struct{})
    
        // Snapshot deep copy overall cache information into snapshot
        Snapshot() *api.ClusterInfo
    
        // SchedulerConf return the property of scheduler configuration
        LoadSchedulerConf(path string) (map[string]string, error)
    
        // WaitForCacheSync waits for all cache synced
        WaitForCacheSync(stopCh <-chan struct{}) bool
    
        // Bind binds Task to the target host.
        // TODO(jinzhej): clean up expire Tasks.
        Bind(task *api.TaskInfo, hostname string) error
    
        // Evict evicts the task to release resources.
        Evict(task *api.TaskInfo, reason string) error
    
        // Backoff puts job in backlog for a while.
        Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error
    }
    

    cache模块负责两件事情:

    • 通过ListWatch同步pods/nodes/queues等REST信息;
    • 给调度过程提供REST的Snapshot

    具体实现

    kube-batch\pkg\scheduler\cache\cache.go

    func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
        go sc.pdbInformer.Informer().Run(stopCh)
        go sc.podInformer.Informer().Run(stopCh)
        go sc.nodeInformer.Informer().Run(stopCh)
        go sc.podGroupInformer.Informer().Run(stopCh)
    
        if sc.namespaceAsQueue {
            go sc.nsInformer.Informer().Run(stopCh)
        } else {
            go sc.queueInformer.Informer().Run(stopCh)
        }
    
        // Re-sync error tasks.
        go sc.resync()
    
        // Cleanup jobs.
        go sc.cleanupJobs()
    }
    

    这里看到kube-batch会cache PDB/Pod/Node/PodGroup,其中PodGroup是kube-batch定义的CRDs,是实现批量调度的核心。

    kube-batch\config\crds\scheduling_v1alpha1_podgroup.yaml

    apiVersion: apiextensions.k8s.io/v1beta1
    kind: CustomResourceDefinition
    metadata:
      name: podgroups.scheduling.incubator.k8s.io
    spec:
      group: scheduling.incubator.k8s.io
      names:
        kind: PodGroup
        plural: podgroups
      scope: Namespaced
      validation:
        openAPIV3Schema:
          properties:
            apiVersion:
              type: string
            kind:
              type: string
            metadata:
              type: object
            spec:
              properties:
                minMember:
                  format: int32
                  type: integer
              type: object
            status:
              properties:
                succeeded:
                  format: int32
                  type: integer
                failed:
                  format: int32
                  type: integer
                running:
                  format: int32
                  type: integer
              type: object
          type: object
      version: v1alpha1
    
    

    显然,我们需要重点关注sepc.minMember,它表示一个PodGroup作业需要的task数量,即Pod数量。具体的实现逻辑,后面会分析。cache

    session

    kube-batch\pkg\scheduler\framework\framework.go

    func OpenSession(cache cache.Cache, args []*PluginArgs) *Session {
        ssn := openSession(cache)
    
        for _, plugin := range ssn.plugins {
            plugin.OnSessionOpen(ssn)
        }
    
        return ssn
    }
    
    func openSession(cache cache.Cache) *Session {
        ssn := &Session{
            UID:        uuid.NewUUID(),
            cache:      cache,
            JobIndex:   map[api.JobID]*api.JobInfo{},
            NodeIndex:  map[string]*api.NodeInfo{},
            QueueIndex: map[api.QueueID]*api.QueueInfo{},
        }        
        snapshot := cache.Snapshot()
    }
    

    这里看到每次执行调度会话都会dump cache snapshot,然后执行之前注册的plugin。具体的plugin过程会在后面分析。plugins

    action

    kube-batch\pkg\scheduler\scheduler.go

    func (pc *Scheduler) runOnce() {
        glog.V(4).Infof("Start scheduling ...")
        defer glog.V(4).Infof("End scheduling ...")
    
        ssn := framework.OpenSession(pc.cache, pc.pluginArgs)
        defer framework.CloseSession(ssn)
    
        if glog.V(3) {
            glog.V(3).Infof("%v", ssn)
        }
    
        for _, action := range pc.actions {
            action.Execute(ssn)
        }
    
    }
    
    

    最后就是一次执行之前注册的action

    相关文章

      网友评论

          本文标题:深入分析kube-batch(1)——启动过程

          本文链接:https://www.haomeiwen.com/subject/nuhoaftx.html