美文网首页kubeflow
kube-batch 从代码中找出gang scheduler这

kube-batch 从代码中找出gang scheduler这

作者: zoux | 来源:发表于2019-04-14 16:37 被阅读18次

https://www.jianshu.com/p/7286d895dcc0

从上面这个过程中,已经知道了kube-batch的启动过程。kube-batch总共有4个过程。这里我们从Allocate开始。

目录:
一: 流程解释
二:代码说明

一: 流程解释

在allocate.go中:找到Execute函数。首先用文字解释一下整个的过程:
流程:
(1)将kube-batch的job放入对应的队列。这是一个具有优先级的队列。
(2)依次遍历这些队列,如果为空就跳过
(3)如果不为空,依次从队列中pop出一个job.即接下来要调度这个job
(4)取出这个job对应的所有Tasks(即要绑定的pod),对每个task进行假绑定,这里的假绑定意思是 只是更新task的状态,先记录pod绑定在哪个节点上。当达到JobReady时,进行真正的绑定。这样就实现了一次性绑定了好几个Pod.
(5)更新job的信息,将pod重新加入队列。跳出循环,再次进行调度。

对上面流程有两个地方需要再解释一下:
(1) jobReady 作用是什么,gang scheduler和这个有什么关系?
allocate每次都是对task进行假绑定。jobReady是一个信号。表示现在可以进行真正的绑定了。

在gang.go的75行,实现了这个接口:

func jobReady(obj interface{}) bool {
    job := obj.(*api.JobInfo)

    occupied := readyTaskNum(job)

    return occupied >= job.MinAvailable
}

可以看出来,gang scheduler中 就是通过数量上的判断来进行限制的。job.MinAvailable这个是podgroup的minNumber数量。这样就使得每次调度的时候,只有当MinAvailable个task准备好了之后。才会进行调度,从而达到gang scheduler的效果。

(2)为什么job还要重新加入队列。这个job不是已经调度了吗?
因为有可能job的Tasks数量会多于 job.MinAvailable。例如,一个job有8个task,但是它指定的podgroup的 minNumber=4。这样调度时会首先调度4个task.当真正绑定之后。剩余没绑定的4个task是一个新的job.所有需要重新加入队列。

二:代码说明

func (alloc *allocateAction) Execute(ssn *framework.Session) {
    glog.V(3).Infof("Enter Allocate ...")
    defer glog.V(3).Infof("Leaving Allocate ...")

    // 这是优先级队列,即队列里面的内容是有优先级的
    queues := util.NewPriorityQueue(ssn.QueueOrderFn)
    jobsMap := map[api.QueueID]*util.PriorityQueue{}

    //首先将所有的kube-batch job放入
    for _, job := range ssn.Jobs {
        if queue, found := ssn.Queues[job.Queue]; found {
            queues.Push(queue)
        } else {
            glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
                job.Namespace, job.Name, job.Queue)
            continue
        }

        if _, found := jobsMap[job.Queue]; !found {
            jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
        }

        glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
        jobsMap[job.Queue].Push(job)
    }

    glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))

    pendingTasks := map[api.JobID]*util.PriorityQueue{}

    for {
        if queues.Empty() {
            break
        }
        // 从第一个队列开始寻找是否有job需要调度
        queue := queues.Pop().(*api.QueueInfo)
        if ssn.Overused(queue) {
            glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
            continue
        }

        jobs, found := jobsMap[queue.UID]

        glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)

        if !found || jobs.Empty() {
            glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
            continue
        }
        
        // 从队列中依次弹出job进行调度
        job := jobs.Pop().(*api.JobInfo)
        if _, found := pendingTasks[job.UID]; !found {
            tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
            for _, task := range job.TaskStatusIndex[api.Pending] {
                // Skip BestEffort task in 'allocate' action.
                if task.Resreq.IsEmpty() {
                    glog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
                        task.Namespace, task.Name)
                    continue
                }

                tasks.Push(task)
            }
            pendingTasks[job.UID] = tasks
        }
        tasks := pendingTasks[job.UID]

        glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
            tasks.Len(), job.Namespace, job.Name)
        
        // 具体调度Task的循环,每次都假绑定一个Task,表示这个task已经完成
        for !tasks.Empty() {
            predicateNodes := []*api.NodeInfo{}
            nodeScores := map[int][]*api.NodeInfo{}

            task := tasks.Pop().(*api.TaskInfo)
            assigned := false

            glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
                len(ssn.Nodes), job.Namespace, job.Name)

            //any task that doesn't fit will be the last processed
            //within this loop context so any existing contents of
            //NodesFitDelta are for tasks that eventually did fit on a
            //node
            
            // 后面的很长一般分,就是为task选择一个合适的node。
            //主要内容是先过滤,然后选择一个满足task的最优节点,然后更新job中该task的信息
            if len(job.NodesFitDelta) > 0 {
                job.NodesFitDelta = make(api.NodeResourceMap)
            }
            for _, node := range ssn.Nodes {
                glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
                    task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)

                // TODO (k82cn): Enable eCache for performance improvement.
                if err := ssn.PredicateFn(task, node); err != nil {
                    glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
                        task.Namespace, task.Name, node.Name, err)
                    continue
                } else {
                    predicateNodes = append(predicateNodes, node)
                }
            }
            for _, node := range predicateNodes {
                score, err := ssn.NodeOrderFn(task, node)
                if err != nil {
                    glog.V(3).Infof("Error in Calculating Priority for the node:%v", err)
                } else {
                    nodeScores[score] = append(nodeScores[score], node)
                }
            }
            selectedNodes := util.SelectBestNode(nodeScores)
            for _, node := range selectedNodes {
                // Allocate idle resource to the task.
                if task.InitResreq.LessEqual(node.Idle) {
                    glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
                        task.Namespace, task.Name, node.Name)
                    // !!!这里需要重点注意,这里调用了session.go中Allocate函数。 下面会将这个的作用
                    if err := ssn.Allocate(task, node.Name); err != nil {         
                        glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
                            task.UID, node.Name, ssn.UID, err)
                        continue
                    }
                    assigned = true
                    break
                } else {
                    //store information about missing resources
                    job.NodesFitDelta[node.Name] = node.Idle.Clone()
                    job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
                    glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
                        task.Namespace, task.Name, node.Name)
                }

                // Allocate releasing resource to the task if any.
                if task.InitResreq.LessEqual(node.Releasing) {
                    glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
                        task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
                    if err := ssn.Pipeline(task, node.Name); err != nil {
                        glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
                            task.UID, node.Name, ssn.UID)
                        continue
                    }

                    assigned = true
                    break
                }
            }
            
            
            //如果绑定某个task过程中失败,比如资源不足。那么就会跳出这个循环。
            if !assigned {
                break
            }
            // 将job重新加入队列,然后进行下一个job的调度。
            if ssn.JobReady(job) {
                jobs.Push(job)
                break
            }
        }
        // Added Queue back until no job in Queue.  
        queues.Push(queue)
    }

session.go中Allocate函数

func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {
    if err := ssn.cache.AllocateVolumes(task, hostname); err != nil {
        return err
    }

    // 这里这是更新task的状态。
    // Only update status in session
    job, found := ssn.Jobs[task.Job]
    if found {
        if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
            glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
                task.Namespace, task.Name, api.Allocated, ssn.UID, err)
            return err
        }
    } else {
        glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.",
            task.Job, ssn.UID)
        return fmt.Errorf("failed to find job %s", task.Job)
    }

    task.NodeName = hostname

    if node, found := ssn.Nodes[hostname]; found {
        if err := node.AddTask(task); err != nil {
            glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
                task.Namespace, task.Name, hostname, ssn.UID, err)
            return err
        }
        glog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
            task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
    } else {
        glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.",
            hostname, ssn.UID)
        return fmt.Errorf("failed to find node %s", hostname)
    }

    //gang.go中有,这里是真正的绑定了,当jobReady时,调用dispatch函数对所有的Allocated的task进行绑定。
    // dispatch就在该函数的下面。内容也很直观,就是调用k8s的接口,真正的绑定pod
    if ssn.JobReady(job) {
        for _, task := range job.TaskStatusIndex[api.Allocated] {
            if err := ssn.dispatch(task); err != nil {                    // 如果job准备好了,就直接真正绑定所有准备好的任务??
                glog.Errorf("Failed to dispatch task <%v/%v>: %v",
                    task.Namespace, task.Name, err)
                return err
            }
        }
    }

    return nil
}

总结:
感觉自己的文字表达能力还是不行,还需要更多的锻炼。
结合代码的注释和上面的流程说明一起看会更容易理解。

在session.go中可以看到,每次为task分配资源时,首先都是更新状态,只有达到jobReady时,才真正的绑定到具体的某个结点上。

当然如果当前要调度的job1,它需要的资源不足,那么当前这个job1就会跳出循环,找下一个要进行调度的job。不用担心,job1中已经绑定的task所占的资源。backfill操作会将Job1清空。

相关文章

网友评论

    本文标题:kube-batch 从代码中找出gang scheduler这

    本文链接:https://www.haomeiwen.com/subject/tlhswqtx.html