美文网首页kuberne...kubernets学习
client-go的workqueue详解

client-go的workqueue详解

作者: zoux | 来源:发表于2022-08-19 09:56 被阅读0次

    Table of Contents

    1. 章节介绍

    在介绍完Informer机制后,可以发现如果想自定义控制器非常简单,我们直接注册handler就行。但是绝大部分k8s原生控制器中,handler并没有直接处理。而是统一遵守一套:

    Add , update, Del -> queue -> run -> runWorker -> syncHandler 处理的模式。

    例如 namespaces控制器中:

    // 1.先是定义了一个限速队列
    queue:                      workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
    
    
    // 2.然后add, update都是入队列
    // configure the namespace informer event handlers
        namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
            cache.ResourceEventHandlerFuncs{
                AddFunc: func(obj interface{}) {
                    namespace := obj.(*v1.Namespace)
                    namespaceController.enqueueNamespace(namespace)
                },
                UpdateFunc: func(oldObj, newObj interface{}) {
                    namespace := newObj.(*v1.Namespace)
                    namespaceController.enqueueNamespace(namespace)
                },
            },
            resyncPeriod,
        )
        
    // 3.然后controller.run,启动多个协程
    // Run starts observing the system with the specified number of workers.
    func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
      
        for i := 0; i < workers; i++ {
            go wait.Until(nm.worker, time.Second, stopCh)
        }
        <-stopCh
    }
    
    // 4. worker处理一个个数据
    func (nm *NamespaceController) worker() {
    
        // 得到对象
            key, quit := nm.queue.Get()
            
            // 处理完对象
            defer nm.queue.Done(key)
    
            err := nm.syncNamespaceFromKey(key.(string))
            if err == nil {
                // no error, forget this entry and return
                nm.queue.Forget(key)
                return false
            }
    }
    

    可以看出来这一套的一个好处:

    (1)利用了Indexer本地缓存机制,queue里面只包括 key就行。数据indexer都有

    (2)workqueue除了一个缓冲机制外,还有着错误重试的机制

    因此这一节分析一下,client-go提供了哪些workqueue

    2. workerqueue介绍

    client-go 的 util/workqueue 包里主要有三个队列,分别是普通队列,延时队列,限速队列,后一个队列以前一个队列的实现为基础,层层添加新功能,我们按照 Queue、DelayingQueue、RateLimitingQueue 的顺序层层拨开来看限速队列是如何实现的。

    2.1 queue

    2.1.1 queue接口
    type Interface interface {
       Add(item interface{})  // 添加一个元素
       Len() int              // 元素个数
       Get() (item interface{}, shutdown bool) // 获取一个元素,第二个返回值和 channel 类似,标记队列是否关闭了
       Done(item interface{}) // 标记一个元素已经处理完
       ShutDown()             // 关闭队列
       ShuttingDown() bool    // 是否正在关闭
    }
    
    
    type Type struct {
       queue []t            // 定义元素的处理顺序,里面所有元素都应该在 dirty set 中有,而不能出现在 processing set 中
       dirty set            // 标记所有需要被处理的元素
       processing set       // 当前正在被处理的元素,当处理完后需要检查该元素是否在 dirty set 中,如果有则添加到 queue 里
    
       cond *sync.Cond      // 条件锁
       shuttingDown bool    // 是否正在关闭
       metrics queueMetrics
       unfinishedWorkUpdatePeriod time.Duration
       clock                      clock.Clock
    }
    

    这个 Queue 的工作逻辑大致是这样,里面的三个属性 queue、dirty、processing 都保存 items,但是含义有所不同:

    • queue:这是一个 []t 类型,也就是一个切片,因为其有序,所以这里当作一个列表来存储 item 的处理顺序。
    • dirty:这是一个 set 类型,也就是一个集合,这个集合存储的是所有需要处理的 item,这些 item 也会保存在 queue 中,但是 set 里是无序的,set 的特性是唯一。可以认为dirty就是queue的不同实现, queue是为了有序,set是为了保证元素唯一。
    • processing:这也是一个 set,存放的是当前正在处理的 item,也就是说这个 item 来自 queue 出队的元素,同时这个元素会被从 dirty 中删除。

    目前看这些还有些懵,直接看看queue的核心函数。

    add

    从这里就可以看出来,queue函数进行了过滤。比如我更新了pod1三次。

    pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
    

    informer的distrube函数会发送三个更新事件,queue也会收到三个更新事件,但是queue里面只会有一个 one(pod1的key)。

    为什么只需要保留一个就行?

    因为indexer已经更新了,indexer的数据是最新的。所以从这里也可以看出来,使用这一套逻辑,就没有update ,add, delete等区别了。

    如果我想统计一下,每个Pod变化了多少次,那就不能使用 workqueue了,必须在handler那里直接实现。

    // Add marks item as needing processing.
    func (q *Type) Add(item interface{}) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        if q.shuttingDown {
            return
        }
        
        // dirty set 中已经有了该 item,则返回
        if q.dirty.has(item) {   
            return
        }
    
        q.metrics.add(item)
      
      
        q.dirty.insert(item)
        // 如果正在处理,也直接返回
        if q.processing.has(item) {
            return
        }
      
      // 否则就扔进queue队列
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
    
    get

    get会将元素从queue队列去列,表示这个元素,正在处理中。

    dirty和queue保持一致,也会删除这个元素。

    // get是从 queue队列中取出一个元素(queue中删除,dirty中删除)
    // 并且标记它正在处理,
    func (q *Type) Get() (item interface{}, shutdown bool) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        for len(q.queue) == 0 && !q.shuttingDown {
            q.cond.Wait()
        }
        if len(q.queue) == 0 {
            // We must be shutting down.
            return nil, true
        }
    
        item, q.queue = q.queue[0], q.queue[1:]
    
        q.metrics.get(item)
    
        q.processing.insert(item)
        q.dirty.delete(item)
    
        return item, false
    }
    
    done

    done表明这个元素被处理完了,从processing队列删除。这里加了一个判断,如果dirty中还存在,还要将其加入 queue

    为什么需要这个判断呢?

    原因在于有一种请求是 itemA 正在处理,但是还没done,这个时候又来了一次 itemA。

    这个时候add 逻辑中,是直接返回的,不会添加itemA到queue的。所以这里要重新添加一次

    
    // Done marks item as done processing, and if it has been marked as dirty again
    // while it was being processed, it will be re-added to the queue for
    // re-processing.
    func (q *Type) Done(item interface{}) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
    
        q.metrics.done(item)
    
        q.processing.delete(item)
        // 判断dirty是否有该元素
        if q.dirty.has(item) {
            q.queue = append(q.queue, item)
            q.cond.Signal()
        }
    }
    

    2.2 DelayingQueue-延迟队列

    // delayingType wraps an Interface and provides delayed re-enquing
    type delayingType struct {
        Interface                     //上面的通用队列
        clock clock.Clock             // 时钟,用于获取时间
        stopCh chan struct{}          // 延时就意味着异步,就要有另一个协程处理,所以需要退出信号
        stopOnce sync.Once            // 用来确保 ShutDown() 方法只执行一次
        heartbeat clock.Ticker        // 定时器,在没有任何数据操作时可以定时的唤醒处理协程
        waitingForAddCh chan *waitFor // 所有延迟添加的元素封装成waitFor放到chan中
        metrics retryMetrics
    }
    
    type DelayingInterface interface {
        Interface
        // AddAfter adds an item to the workqueue after the indicated duration has passed
        AddAfter(item interface{}, duration time.Duration)
    }
    
    2.2.1 waitFor
    type waitFor struct {
       data    t          // 准备添加到队列中的数据
       readyAt time.Time  // 应该被加入队列的时间
       index int          // 在 heap 中的索引
    }
    

    waitForPriorityQueue是一个数组,实现了最小堆,对比的就是延迟的时间。

    type waitForPriorityQueue []*waitFor
    // heap需要实现的接口,告知队列长度
    func (pq waitForPriorityQueue) Len() int {
        return len(pq)
    }
    // heap需要实现的接口,告知第i个元素是否比第j个元素小
    func (pq waitForPriorityQueue) Less(i, j int) bool {
        return pq[i].readyAt.Before(pq[j].readyAt) // 此处对比的就是时间,所以排序按照时间排序
    }
    // heap需要实现的接口,实现第i和第j个元素换
    func (pq waitForPriorityQueue) Swap(i, j int) {
        // 这种语法好牛逼,有没有,C/C++程序猿没法理解~
        pq[i], pq[j] = pq[j], pq[i]
        pq[i].index = i                            // 因为heap没有所以,所以需要自己记录索引,这也是为什么waitFor定义索引参数的原因
        pq[j].index = j
    }
    // heap需要实现的接口,用于向队列中添加数据
    func (pq *waitForPriorityQueue) Push(x interface{}) {
        n := len(*pq)                       
        item := x.(*waitFor)
        item.index = n                             // 记录索引值
        *pq = append(*pq, item)                    // 放到了数组尾部
    }
    // heap需要实现的接口,用于从队列中弹出最后一个数据
    func (pq *waitForPriorityQueue) Pop() interface{} {
        n := len(*pq)
        item := (*pq)[n-1]
        item.index = -1
        *pq = (*pq)[0:(n - 1)]                     // 缩小数组,去掉了最后一个元素
        return item
    }
    // 返回第一个元素
    func (pq waitForPriorityQueue) Peek() interface{} {
        return pq[0]
    }
    

    到这里就可以大概猜出来延迟队列的实现了。

    就是所有添加的元素,有一个延迟时间,根据延迟时间构造一个最小堆。然后每次时间一到,从堆里面拿出来当前应该加入队列的时间。

    2.2. 2 NewNamedDelayingQueue
    // 这里可以传递一个名字
    func NewNamedDelayingQueue(name string) DelayingInterface {
       return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
    }
    
    // 上面一个函数只是调用当前函数,附带一个名字,这里加了一个指定 clock 的能力
    func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
      return newDelayingQueue(clock, NewNamed(name), name) // 注意这里的 NewNamed() 函数
    }
    
    func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
       ret := &delayingType{
          Interface:       q,
          clock:           clock,
          heartbeat:       clock.NewTicker(maxWait), // 10s 一次心跳
          stopCh:          make(chan struct{}),
          waitingForAddCh: make(chan *waitFor, 1000),
          metrics:         newRetryMetrics(name),
       }
    
       go ret.waitingLoop() // 核心就是运行 waitingLoop
       return ret
    }
    
    2.2.3 waitingLoop
    func (q *delayingType) waitingLoop() {
       defer utilruntime.HandleCrash()
       // 队列里没有 item 时实现等待用的
       never := make(<-chan time.Time)
       var nextReadyAtTimer clock.Timer
       // 构造一个优先级队列
       waitingForQueue := &waitForPriorityQueue{}
       heap.Init(waitingForQueue) // 这一行其实是多余的,等下提个 pr 给它删掉
    
       // 这个 map 用来处理重复添加逻辑的,下面会讲到
       waitingEntryByData := map[t]*waitFor{}
       // 无限循环
       for {
          // 这个地方 Interface 是多余的,等下也提个 pr 把它删掉吧
          if q.Interface.ShuttingDown() {
             return
          }
    
          now := q.clock.Now()
          // 队列里有 item 就开始循环
          for waitingForQueue.Len() > 0 {
             // 获取第一个 item
             entry := waitingForQueue.Peek().(*waitFor)
             // 时间还没到,先不处理
             if entry.readyAt.After(now) {
                break
             }
            // 时间到了,pop 出第一个元素;注意 waitingForQueue.Pop() 是最后一个 item,heap.Pop() 是第一个元素
             entry = heap.Pop(waitingForQueue).(*waitFor)
             // 将数据加到延时队列里
             q.Add(entry.data)
             // map 里删除已经加到延时队列的 item
             delete(waitingEntryByData, entry.data)
          }
    
          // 如果队列中有 item,就用第一个 item 的等待时间初始化计时器,如果为空则一直等待
          nextReadyAt := never
          if waitingForQueue.Len() > 0 {
             if nextReadyAtTimer != nil {
                nextReadyAtTimer.Stop()
             }
             entry := waitingForQueue.Peek().(*waitFor)
             nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
             nextReadyAt = nextReadyAtTimer.C()
          }
    
          select {
          case <-q.stopCh:
             return
          case <-q.heartbeat.C(): // 心跳时间是 10s,到了就继续下一轮循环
          case <-nextReadyAt: // 第一个 item 的等到时间到了,继续下一轮循环
          case waitEntry := <-q.waitingForAddCh: // waitingForAddCh 收到新的 item
             // 如果时间没到,就加到优先级队列里,如果时间到了,就直接加到延时队列里
             if waitEntry.readyAt.After(q.clock.Now()) {
                insert(waitingForQueue, waitingEntryByData, waitEntry)
             } else {
                q.Add(waitEntry.data)
             }
             // 下面的逻辑就是将 waitingForAddCh 中的数据处理完
             drained := false
             for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:
                   if waitEntry.readyAt.After(q.clock.Now()) {
                      insert(waitingForQueue, waitingEntryByData, waitEntry)
                   } else {
                      q.Add(waitEntry.data)
                   }
                default:
                   drained = true
                }
             }
          }
       }
    }
    
    2.2.4

    这个方法的作用是在指定的延时到达之后,在 work queue 中添加一个元素,源码如下:

    func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
       if q.ShuttingDown() { // 已经在关闭中就直接返回
          return
       }
    
       q.metrics.retry()
    
       if duration <= 0 { // 如果时间到了,就直接添加
          q.Add(item)
          return
       }
    
       select {
       case <-q.stopCh:
         // 构造 waitFor{},丢到 waitingForAddCh
       case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
       }
    }
    
    其实就是一个往堆加入元素的过程
    func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
       // 这里的主要逻辑是看一个 entry 是否存在,如果已经存在,新的 entry 的 ready 时间更短,就更新时间
       existing, exists := knownEntries[entry.data]
       if exists {
          if existing.readyAt.After(entry.readyAt) {
             existing.readyAt = entry.readyAt // 如果存在就只更新时间
             heap.Fix(q, existing.index)
          }
    
          return
       }
       // 如果不存在就丢到 q 里,同时在 map 里记录一下,用于查重
       heap.Push(q, entry)
       knownEntries[entry.data] = entry
    }
    
    2.2.5 总结

    (1)延迟队列的核心就是,根据加入队列的时间,构造一个最小堆,然后再到时间点后,将其加入queue中

    (2)上诉判断是否到时间点,不仅仅是一个for循环,还利用了心跳,channel机制

    (3)当某个对象处理的时候失败了,可以利用延迟队列的思想,等一会再重试,因为马上重试肯定是失败的

    2.3 RateLimitingQueue-限速队列

    2.3.1 RateLimiting结构体
    type RateLimitingInterface interface {
        DelayingInterface     //延迟队列
    
        AddRateLimited(item interface{})     //已限速方式,往队列添加一个元素
    
        // 标记介绍重试
        Forget(item interface{})
      
      // 重试了几次
        NumRequeues(item interface{}) int
    }
    
    
    // rateLimitingType wraps an Interface and provides rateLimited re-enquing
    type rateLimitingType struct {
        DelayingInterface    
    
        rateLimiter RateLimiter   //多了一个限速器
    }
    
    2.3.2 限速器类型

    可以看出来,限速队列和 延迟队列是一模一样的。

    延迟队列是自己决定 某个元素延迟多久。

    而限速队列是 有限速器决定 某个元素延迟多久。

    type RateLimiter interface {
        // 输入一个对象,判断延迟多久
        When(item interface{}) time.Duration
        
        // 标记介绍重试
        Forget(item interface{})
        
        // 重试了几次
        NumRequeues(item interface{}) int
    }
    

    这个接口有五个实现,分别为:

    1. BucketRateLimiter
    2. ItemExponentialFailureRateLimiter
    3. ItemFastSlowRateLimiter
    4. MaxOfRateLimiter
    5. WithMaxWaitRateLimiter
    BucketRateLimiter

    这个限速器可说的不多,用了 golang 标准库的 golang.org/x/time/rate.Limiter 实现。BucketRateLimiter 实例化的时候比如传递一个 rate.NewLimiter(rate.Limit(10), 100) 进去,表示令牌桶里最多有 100 个令牌,每秒发放 10 个令牌。

    所有元素都是一样的,来几次都是一样,所以NumRequeues,Forget都没有意义。

    type BucketRateLimiter struct {
       *rate.Limiter
    }
    
    var _ RateLimiter = &BucketRateLimiter{}
    
    func (r *BucketRateLimiter) When(item interface{}) time.Duration {
       return r.Limiter.Reserve().Delay() // 过多久后给当前 item 发放一个令牌
    }
    
    func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
       return 0
    }
    
    // 
    func (r *BucketRateLimiter) Forget(item interface{}) {
    }
    
    ItemExponentialFailureRateLimiter

    Exponential 是指数的意思,从这个限速器的名字大概能猜到是失败次数越多,限速越长而且是指数级增长的一种限速器。

    结构体定义如下,属性含义基本可以望文生义

    func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
       r.failuresLock.Lock()
       defer r.failuresLock.Unlock()
    
       exp := r.failures[item]
       r.failures[item] = r.failures[item] + 1 // 失败次数加一
    
       // 每调用一次,exp 也就加了1,对应到这里时 2^n 指数爆炸
       backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
       if backoff > math.MaxInt64 { // 如果超过了最大整型,就返回最大延时,不然后面时间转换溢出了
          return r.maxDelay
       }
    
       calculated := time.Duration(backoff)
       if calculated > r.maxDelay { // 如果超过最大延时,则返回最大延时
          return r.maxDelay
       }
    
       return calculated
    }
    
    func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
       r.failuresLock.Lock()
       defer r.failuresLock.Unlock()
    
       return r.failures[item]
    }
    
    func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
       r.failuresLock.Lock()
       defer r.failuresLock.Unlock()
    
       delete(r.failures, item)
    }
    
    ItemFastSlowRateLimiter

    快慢限速器,也就是先快后慢,定义一个阈值,超过了就慢慢重试。先看类型定义:

    type ItemFastSlowRateLimiter struct {
       failuresLock sync.Mutex
       failures     map[interface{}]int
    
       maxFastAttempts int            // 快速重试的次数
       fastDelay       time.Duration  // 快重试间隔
       slowDelay       time.Duration  // 慢重试间隔
    }
    
    func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
       r.failuresLock.Lock()
       defer r.failuresLock.Unlock()
    
       r.failures[item] = r.failures[item] + 1 // 标识重试次数 + 1
    
       if r.failures[item] <= r.maxFastAttempts { // 如果快重试次数没有用完,则返回 fastDelay
          return r.fastDelay
       }
    
       return r.slowDelay // 反之返回 slowDelay
    }
    
    func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
       r.failuresLock.Lock()
       defer r.failuresLock.Unlock()
    
       return r.failures[item]
    }
    
    func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
       r.failuresLock.Lock()
       defer r.failuresLock.Unlock()
    
       delete(r.failures, item)
    }
    
    MaxOfRateLimiter

    组合限速器,内部放多个限速器,然后返回限速最慢的一个延时:

    type MaxOfRateLimiter struct {
       limiters []RateLimiter
    }
    
    func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
       ret := time.Duration(0)
       for _, limiter := range r.limiters {
          curr := limiter.When(item)
          if curr > ret {
             ret = curr
          }
       }
    
       return ret
    }
    
    WithMaxWaitRateLimiter

    这个限速器也很简单,就是在其他限速器上包装一个最大延迟的属性,如果到了最大延时,则直接返回。这样就能避免延迟时间不可控,万一一个对象失败了多次,那以后的时间会越来越大。

    type WithMaxWaitRateLimiter struct {
       limiter  RateLimiter   // 其他限速器
       maxDelay time.Duration // 最大延时
    }
    
    func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
       return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
    }
    
    func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
       delay := w.limiter.When(item)
       if delay > w.maxDelay {
          return w.maxDelay // 已经超过了最大延时,直接返回最大延时
       }
    
       return delay
    }
    

    3.总结

    (1)workerqueue使用于只关注结果的处理方式。 比如统计一个Pod update了多少次这种关乎 过程的 处理。不能用,因为workerqueue进行了合并

    (2)workerqueue实现了很多限速机制,可以更加情况酌情使用

    4. 参考文档

    https://blog.csdn.net/weixin_42663840/article/details/81482553

    https://www.danielhu.cn/post/k8s/client-go-workqueue/

    相关文章

      网友评论

        本文标题:client-go的workqueue详解

        本文链接:https://www.haomeiwen.com/subject/whtsgrtx.html