001.调度器学习基础概览
1. 资源调度基础
scheudler是kubernetes中的核心组件,负责为用户声明的pod资源选择合适的node,同时保证集群资源的最大化利用,这里先介绍下资源调度系统设计里面的一些基础概念
1.1 基础任务资源调度
image.png2.1.1 中心化的数据存储
kubernetes是一个数据中心化存储的系统,集群中的所有数据都通过apiserver存储到etcd中,包括node节点的资源信息、节点上面的pod信息、当前集群的所有pod信息,在这里其实apiserver也充当了resource manager的角色,存储所有的集群资源和已经分配的资源
2.1.2 调度数据的存储与获取
image.png2.6.1 优先级函数
podBackoffQ实际上会根据pod的backoffTime来进行优先级排序,所以podBackoffQ的队列头部,就是最近一个要过期的pod
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
return bo1.Before(bo2)
}
2.6.2 调度失败加入到podBackoffQ
如果调度失败,并且moveRequestCycle=podSchedulingCycle的时候就加入podBackfoffQ中
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
// 省略检查性代码
// 更新pod的backoff 信息
p.backoffPod(pod)
// moveRequestCycle将pod从unscheduledQ大于pod的调度周期添加到 如果pod的调度周期小于当前的调度周期
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
p.unschedulableQ.addOrUpdate(pInfo)
}
p.nominatedPods.add(pod, "")
return nil
}
2.6.3 从unschedulableQ迁移
在前面介绍的当集群资源发生变更的时候,会触发尝试unschedulabelQ中的pod进行转移,如果发现当前pod还未到达backoffTime,就加入到podBackoffQ中
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
}
2.6.4 podBackoffQ定时转移
在创建PriorityQueue的时候,会创建两个定时任务其中一个就是讲backoffQ中的pod到期后的转移,每秒钟尝试一次
func (p *PriorityQueue) run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
因为是一个堆结果,所以只需要获取堆顶的元素,然后确定是否到期,如果到期后则进行pop处来,加入到activeQ中
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
for {
// 获取堆顶元素
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
return
}
pod := rawPodInfo.(*framework.PodInfo).Pod
// 获取到期时间
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !found {
// 如果当前已经不在podBackoff中,则就pop出来然后放入到activeQ
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
continue
}
// 未超时
if boTime.After(p.clock.Now()) {
return
}
// 超时就pop出来
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
return
}
// 加入到activeQ中
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
}
}
2.7 unschedulableQ
image.png2.7.1 调度失败
调度失败后,如果当前集群资源没有发生变更,就加入到unschedulable,原因上面说过
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
// 省略检查性代码
// 更新pod的backoff 信息
p.backoffPod(pod)
// moveRequestCycle将pod从unscheduledQ大于pod的调度周期添加到 如果pod的调度周期小于当前的调度周期
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
p.unschedulableQ.addOrUpdate(pInfo)
}
p.nominatedPods.add(pod, "")
return nil
}
2.7.2 定时转移任务
定时任务每30秒执行一次
func (p *PriorityQueue) run() {
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
逻辑其实就非常简单如果当前时间-pod的最后调度时间大于60s,就重新调度,转移到podBackoffQ或者activeQ中
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*framework.PodInfo
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.Timestamp
// 如果该pod1分钟内没有被调度就加入到podsToMove
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo)
}
}
if len(podsToMove) > 0 {
// podsToMove将这些pod移动到activeQ
p.movePodsToActiveQueue(podsToMove)
}
}
3. 调度队列总结
3.1 数据流设计总结
image.png3.1.1 三队列与后台定时任务
从设计上三队列分别存储:活动队列、bakcoff队列、不可调度队列,其中backoff中会根据任务的失败来逐步递增重试时间(最长10s)、unschedulableQ队列则延迟60s
通过后台定时任务分别将backoffQ队列、unschedulableQ队列来进行重试,加入到activeQ中,从而加快完成pod的失败重试调度
3.1.2 cycle与优先调度
schedulingCycle、moveRequestCycle两个cycle其实本质上也是为了加快失败任务的重试调度,当集群资源发生变化的时候,进行立即重试,那些失败的优先级比较高、亲和性问题的pod都可能会被优先调度
3.1.3 锁与cond实现线程安全pop
内部通过lock保证线程安全,并通过cond来实现阻塞等待,从而实现阻塞scheduler worker的通知。
网友评论