美文网首页
深入分析kube-batch(2)——cache

深入分析kube-batch(2)——cache

作者: 陈先生_9e91 | 来源:发表于2018-10-13 16:19 被阅读0次

    深入分析kube-batch(2)——cache

    熟悉K8S的同学,一定对cache机制不陌生;在之前启动过程一文中分析过,cache的作用,本文将详细分析cache的实现。

    struct

    interface

    kube-batch\pkg\scheduler\cache\interface.go

    // Cache collects pods/nodes/queues information
    // and provides information snapshot
    type Cache interface {
        // Run start informer
        Run(stopCh <-chan struct{})
    
        // Snapshot deep copy overall cache information into snapshot
        Snapshot() *api.ClusterInfo
    
        // SchedulerConf return the property of scheduler configuration
        LoadSchedulerConf(path string) (map[string]string, error)
    
        // WaitForCacheSync waits for all cache synced
        WaitForCacheSync(stopCh <-chan struct{}) bool
    
        // Bind binds Task to the target host.
        // TODO(jinzhej): clean up expire Tasks.
        Bind(task *api.TaskInfo, hostname string) error
    
        // Evict evicts the task to release resources.
        Evict(task *api.TaskInfo, reason string) error
    
        // Backoff puts job in backlog for a while.
        Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error
    }
    
    type Binder interface {
        Bind(task *v1.Pod, hostname string) error
    }
    
    type Evictor interface {
        Evict(pod *v1.Pod) error
    }
    
    

    重点关注接口的Run/Snaoshot

    implements

    具体的实现在

    kube-batch\pkg\scheduler\cache\cache.go

    type SchedulerCache struct {
       sync.Mutex
    
       kubeclient *kubernetes.Clientset
       arbclient  *versioned.Clientset
    
       podInformer      infov1.PodInformer
       nodeInformer     infov1.NodeInformer
       pdbInformer      policyv1.PodDisruptionBudgetInformer
       nsInformer       infov1.NamespaceInformer
       podGroupInformer arbcoreinfo.PodGroupInformer
       queueInformer    arbcoreinfo.QueueInformer
    
       Binder  Binder
       Evictor Evictor
    
       recorder record.EventRecorder
    
       Jobs   map[arbapi.JobID]*arbapi.JobInfo
       Nodes  map[string]*arbapi.NodeInfo
       Queues map[arbapi.QueueID]*arbapi.QueueInfo
    
       errTasks    *cache.FIFO
       deletedJobs *cache.FIFO
    
       namespaceAsQueue bool
    }
    
    

    SchedulerCache主要由以下组件组成:

    • 锁,解决快照与内存一致性问题
    • K8S clients,访问apiserver
    • Informers,ListWatch REST
    • Jobs/Nodes/Queues,缓存REST

    new

    newSchedulerCache函数代码比较多,就不都贴了。我们可以关注各个Informer的事件注册,其中最重要的就是Pod/PodGroup相关的事件处理。

    Pod

    sc.podInformer.Informer().AddEventHandler(
       cache.FilteringResourceEventHandler{
          FilterFunc: func(obj interface{}) bool {
             switch obj.(type) {
             case *v1.Pod:
                pod := obj.(*v1.Pod)
                if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
                   return true
                }
                return pod.Status.Phase == v1.PodRunning
             default:
                return false
             }
          },
          Handler: cache.ResourceEventHandlerFuncs{
             AddFunc:    sc.AddPod,
             UpdateFunc: sc.UpdatePod,
             DeleteFunc: sc.DeletePod,
          },
       })
       
    

    这里可以看到,kube-batch只关心需要自己调度,并且Pending的Pod;以及Running的Pod。

    kube-batch\pkg\scheduler\cache\event_handlers.go

    func (sc *SchedulerCache) AddPod(obj interface{}) {
        sc.Mutex.Lock()
        defer sc.Mutex.Unlock()
    
        err := sc.addPod(pod)
    }
    
    // Assumes that lock is already acquired.
    func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
        pi := arbapi.NewTaskInfo(pod)
    
        return sc.addTask(pi)
    }
    

    全局一把锁,以后会是性能瓶颈。这里我们看到kube-batch会将Pod转换成TaskInfo缓存起来。

    kube-batch\pkg\scheduler\api\job_info.go

    func NewTaskInfo(pod *v1.Pod) *TaskInfo {
       req := EmptyResource()
    
       // TODO(k82cn): also includes initContainers' resource.
       for _, c := range pod.Spec.Containers {
          req.Add(NewResource(c.Resources.Requests))
       }
    
       ti := &TaskInfo{
          UID:       TaskID(pod.UID),
          Job:       getJobID(pod),
          Name:      pod.Name,
          Namespace: pod.Namespace,
          NodeName:  pod.Spec.NodeName,
          Status:    getTaskStatus(pod),
          Priority:  1,
    
          Pod:    pod,
          Resreq: req,
       }
    
       if pod.Spec.Priority != nil {
          ti.Priority = *pod.Spec.Priority
       }
    
       return ti
    }
    
    

    转换过程比较简单,注意两点:

    • 需要统计资源请求量
    • JobID通过pod.Annotations[arbcorev1.GroupNameAnnotationKey]或者所属的controller

    kube-batch\pkg\scheduler\cache\event_handlers.go

    func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
       if len(pi.Job) != 0 {
          if _, found := sc.Jobs[pi.Job]; !found {
             sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
          }
    
          sc.Jobs[pi.Job].AddTaskInfo(pi)
       }
    }
    

    kube-batch\pkg\scheduler\api\job_info.go

    func NewJobInfo(uid JobID) *JobInfo {
       return &JobInfo{
          UID: uid,
    
          MinAvailable: 0,
          NodeSelector: make(map[string]string),
    
          Allocated:    EmptyResource(),
          TotalRequest: EmptyResource(),
    
          TaskStatusIndex: map[TaskStatus]tasksMap{},
          Tasks:           tasksMap{},
       }
    }
    
    func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
        ji.Tasks[ti.UID] = ti
        ji.addTaskIndex(ti)
    
        ji.TotalRequest.Add(ti.Resreq)
    }
    
    func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
        if _, found := ji.TaskStatusIndex[ti.Status]; !found {
            ji.TaskStatusIndex[ti.Status] = tasksMap{}
        }
    
        ji.TaskStatusIndex[ti.Status][ti.UID] = ti
    }
    
    

    最终task会归于一个job,job主要保存tasks,资源请求总量等信息。

    PodGroup

    sc.podGroupInformer = arbinformer.Scheduling().V1alpha1().PodGroups()
    sc.podGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
       AddFunc:    sc.AddPodGroup,
       UpdateFunc: sc.UpdatePodGroup,
       DeleteFunc: sc.DeletePodGroup,
    })
    

    kube-batch\pkg\scheduler\cache\event_handlers.go

    func (sc *SchedulerCache) AddPodGroup(obj interface{}) {
       sc.Mutex.Lock()
       defer sc.Mutex.Unlock()
    
       err := sc.setPodGroup(ss)
    }
    
    func (sc *SchedulerCache) setPodGroup(ss *arbv1.PodGroup) error {
        job := getJobID(ss)
    
        if _, found := sc.Jobs[job]; !found {
            sc.Jobs[job] = arbapi.NewJobInfo(job)
        }
    
        sc.Jobs[job].SetPodGroup(ss)
    
        return nil
    }
    
    func getJobID(pg *arbv1.PodGroup) arbapi.JobID {
        return arbapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
    }
    

    这里我们可以看到Job就是PodGroup

    kube-batch\pkg\scheduler\api\job_info.go

    func (ji *JobInfo) SetPodGroup(pg *arbcorev1.PodGroup) {
       ji.Name = pg.Name
       ji.Namespace = pg.Namespace
       ji.MinAvailable = pg.Spec.MinMember
    
       if len(pg.Spec.Queue) == 0 {
          ji.Queue = QueueID(pg.Namespace)
       } else {
          ji.Queue = QueueID(pg.Spec.Queue)
       }
    
       ji.PodGroup = pg
    }
    

    重点关注ji.MinAvailable = pg.Spec.MinMember

    run

    func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
       go sc.pdbInformer.Informer().Run(stopCh)
       go sc.podInformer.Informer().Run(stopCh)
       go sc.nodeInformer.Informer().Run(stopCh)
       go sc.podGroupInformer.Informer().Run(stopCh)
    
       if sc.namespaceAsQueue {
          go sc.nsInformer.Informer().Run(stopCh)
       } else {
          go sc.queueInformer.Informer().Run(stopCh)
       }
    
       // Re-sync error tasks.
       go sc.resync()
    
       // Cleanup jobs.
       go sc.cleanupJobs()
    }
    

    run方法比较简单,主要负责:

    • 开始各个REST的ListWatch
    • 根据errTasks队列,重新同步Pod状态
    • 根据deletedJobs队列,清理缓存

    Snapshot

    func (sc *SchedulerCache) Snapshot() *arbapi.ClusterInfo {
       sc.Mutex.Lock()
       defer sc.Mutex.Unlock()
    
       snapshot := &arbapi.ClusterInfo{
          Nodes:  make([]*arbapi.NodeInfo, 0, len(sc.Nodes)),
          Jobs:   make([]*arbapi.JobInfo, 0, len(sc.Jobs)),
          Queues: make([]*arbapi.QueueInfo, 0, len(sc.Queues)),
          Others: make([]*arbapi.TaskInfo, 0, 10),
       }
    
       for _, value := range sc.Nodes {
          snapshot.Nodes = append(snapshot.Nodes, value.Clone())
       }
    
       for _, value := range sc.Queues {
          snapshot.Queues = append(snapshot.Queues, value.Clone())
       }
    
       for _, value := range sc.Jobs {
          // If no scheduling spec, does not handle it.
          if value.PodGroup == nil && value.PDB == nil {
             continue
          }
    
          snapshot.Jobs = append(snapshot.Jobs, value.Clone())
       }
    
       return snapshot
    }
    

    利用Deep Clone dump cache ,唯一需要注意的是必须要创建PodGroup,才能继续调度。

    相关文章

      网友评论

          本文标题:深入分析kube-batch(2)——cache

          本文链接:https://www.haomeiwen.com/subject/ybioaftx.html