美文网首页
深入分析K8S SharedInformer源码

深入分析K8S SharedInformer源码

作者: 陈先生_9e91 | 来源:发表于2019-12-12 10:38 被阅读0次

    深入分析K8S SharedInformer源码

    云原生应用(基于k8s的应用)一定绕不开SharedInformer,所以分析一下该类。

    sharedInformer

    SharedInformer has a shared data cache and is capable of distributing notifications for changes to the cache to multiple listeners who registered via AddEventHandler.

    If you use this, there is one behavior change compared to a standard Informer. When you receive a notification, the cache will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend on the contents of the cache exactly matching the notification you've received in handler functions. If there was a create, followed by a delete, the cache may NOT have your item. This has advantages over the broadcaster since it allows us to share a common cache across many controllers. Extending the broadcaster would have required us keep duplicate caches for each watch.

    sharedInformer有两个功能,cache和注册事件监听。cache功能主要作用是减少对apiserver的直接访问(ps. 毕竟 client-go里有限流=。=)。事件监听可以帮助构建自己的云原生应用。

    type SharedInformer interface {
       // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
       // period.  Events to a single handler are delivered sequentially, but there is no coordination
       // between different handlers.
       AddEventHandler(handler ResourceEventHandler)
       // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
       // specified resync period.  Events to a single handler are delivered sequentially, but there is
       // no coordination between different handlers.
       AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
       // GetStore returns the Store.
       GetStore() Store
       // GetController gives back a synthetic interface that "votes" to start the informer
       GetController() Controller
       // Run starts the shared informer, which will be stopped when stopCh is closed.
       Run(stopCh <-chan struct{})
       // HasSynced returns true if the shared informer's store has synced.
       HasSynced() bool
       // LastSyncResourceVersion is the resource version observed when last synced with the underlying
       // store. The value returned is not synchronized with access to the underlying store and is not
       // thread-safe.
       LastSyncResourceVersion() string
    }
    

    最重要的函数应该是:Run, AddEventHandler, GetIndexer, HasSynced

    sharedInformer.Run

    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
       defer utilruntime.HandleCrash()
    
       fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
        
       cfg := &Config{
            Queue:            fifo,
            ListerWatcher:    s.listerWatcher,
            ObjectType:       s.objectType,
            FullResyncPeriod: s.resyncCheckPeriod,
            RetryOnError:     false,
            ShouldResync:     s.processor.shouldResync,
    
            Process: s.HandleDeltas,
       }
    
       func() {
          s.startedLock.Lock()
          defer s.startedLock.Unlock()
    
          s.controller = New(cfg)
          s.controller.(*controller).clock = s.clock
          s.started = true
       }()
    
       // Separate stop channel because Processor should be stopped strictly after controller
       processorStopCh := make(chan struct{})
       var wg wait.Group
       defer wg.Wait()              // Wait for Processor to stop
       defer close(processorStopCh) // Tell Processor to stop
       wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
       wg.StartWithChannel(processorStopCh, s.processor.run)
    
       defer func() {
          s.startedLock.Lock()
          defer s.startedLock.Unlock()
          s.stopped = true // Don't want any new listeners
       }()
       s.controller.Run(stopCh)
    }
    

    processor.run处理ev handler,后面再分析

    controller.Run

    type Controller interface {
        Run(stopCh <-chan struct{})
        HasSynced() bool
        LastSyncResourceVersion() string
    }
    
    // Run begins processing items, and will continue until a value is sent down stopCh.
    // It's an error to call Run more than once.
    // Run blocks; call via go.
    func (c *controller) Run(stopCh <-chan struct{}) {
       defer utilruntime.HandleCrash()
       go func() {
          <-stopCh
          c.config.Queue.Close()
       }()
       r := NewReflector(
          c.config.ListerWatcher,
          c.config.ObjectType,
          c.config.Queue, // store
          c.config.FullResyncPeriod,
       )
       r.ShouldResync = c.config.ShouldResync
       r.clock = c.clock
    
       c.reflectorMutex.Lock()
       c.reflector = r
       c.reflectorMutex.Unlock()
    
       var wg wait.Group
       defer wg.Wait()
    
       wg.StartWithChannel(stopCh, r.Run)
    
       wait.Until(c.processLoop, time.Second, stopCh)
    }
    

    NewReflector & reflector.run 生产者

    c.processLoop 消费者

    首先分析生产者

    Reflector.Run

    // Reflector watches a specified resource and causes all changes to be reflected in the given store.
    type Reflector struct {
       // name identifies this reflector. By default it will be a file:line if possible.
       name string
       // metrics tracks basic metric information about the reflector
       metrics *reflectorMetrics
    
       // The type of object we expect to place in the store.
       expectedType reflect.Type
       // The destination to sync up with the watch source
       store Store
       // listerWatcher is used to perform lists and watches.
       listerWatcher ListerWatcher
       // period controls timing between one watch ending and
       // the beginning of the next one.
       period       time.Duration
       resyncPeriod time.Duration
       ShouldResync func() bool
       // clock allows tests to manipulate time
       clock clock.Clock
       // lastSyncResourceVersion is the resource version token last
       // observed when doing a sync with the underlying store
       // it is thread safe, but not synchronized with the underlying store
       lastSyncResourceVersion string
       // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
       lastSyncResourceVersionMutex sync.RWMutex
    }
    

    ListAndWatch核心类

    // Run starts a watch and handles watch events. Will restart the watch if it is closed.
    // Run will exit when stopCh is closed.
    func (r *Reflector) Run(stopCh <-chan struct{}) {
       wait.Until(func() {
          if err := r.ListAndWatch(stopCh); err != nil {
             utilruntime.HandleError(err)
          }
       }, r.period, stopCh)
    }
    

    ListAndWatch

    // ListAndWatch first lists all items and get the resource version at the moment of call,
    // and then use the resource version to watch.
    // It returns error if ListAndWatch didn't even try to initialize watch.
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
       var resourceVersion string
    
       // Explicitly set "0" as resource version - it's fine for the List()
       // to be served from cache and potentially be delayed relative to
       // etcd contents. Reflector framework will catch up via Watch() eventually.
       options := metav1.ListOptions{ResourceVersion: "0"}
    
       if err := func() error {
          var list runtime.Object
          var err error
          listCh := make(chan struct{}, 1)
          panicCh := make(chan interface{}, 1)
          go func() {
             defer func() {
                if r := recover(); r != nil {
                   panicCh <- r
                }
             }()
             list, err = r.listerWatcher.List(options)
             close(listCh)
          }()
          select {
          case <-stopCh:
             return nil
          case r := <-panicCh:
             panic(r)
          case <-listCh:
          }
          listMetaInterface, err := meta.ListAccessor(list)
          resourceVersion = listMetaInterface.GetResourceVersion()
          items, err := meta.ExtractList(list)
          
          // 同步缓存 
          if err := r.syncWith(items, resourceVersion); err != nil {
             return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
          }
          
          r.setLastSyncResourceVersion(resourceVersion)
          return nil
       }(); err != nil {
          return err
       }
    
        
    // syncWith replaces the store's items with the given list.
    func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
        found := make([]interface{}, 0, len(items))
        for _, item := range items {
            found = append(found, item)
        }
        return r.store.Replace(found, resourceVersion)
    }
    

    list一份完整的资源;更新缓存;Replace

       resyncerrc := make(chan error, 1)
       cancelCh := make(chan struct{})
       defer close(cancelCh)
       go func() {
          resyncCh, cleanup := r.resyncChan()
          defer func() {
             cleanup() // Call the last one written into cleanup
          }()
          for {
             select {
             case <-resyncCh:
             case <-stopCh:
                return
             case <-cancelCh:
                return
             }
             if r.ShouldResync == nil || r.ShouldResync() {
                if err := r.store.Resync(); err != nil {
                   resyncerrc <- err
                   return
                }
             }
             cleanup()
             resyncCh, cleanup = r.resyncChan()
          }
       }()
    
    func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
        t := r.clock.NewTimer(r.resyncPeriod)
        return t.C(), t.Stop
    }
    

    定时重新同步存储

    
       for {
          case <-stopCh:
             return nil
          default:
          }
    
          timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
          options = metav1.ListOptions{
             ResourceVersion: resourceVersion,
             TimeoutSeconds: &timeoutSeconds,
          }
    
          w, err := r.listerWatcher.Watch(options)
    
          if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
          }
       }
    }
    
    // watchHandler watches w and keeps *resourceVersion up to date.
    func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
        start := r.clock.Now()
        eventCount := 0
    
        defer w.Stop()
    
    loop:
        for {
            select {
            case <-stopCh:
                return errorStopRequested
            case err := <-errc:
                return err
            case event, ok := <-w.ResultChan():
                if !ok {
                    break loop
                }
    
                newResourceVersion := meta.GetResourceVersion()
                switch event.Type {
                case watch.Added:
                    err := r.store.Add(event.Object)
                case watch.Modified:
                    err := r.store.Update(event.Object)
                case watch.Deleted:
                    err := r.store.Delete(event.Object)
                    }
                default:
                }
                *resourceVersion = newResourceVersion
                r.setLastSyncResourceVersion(newResourceVersion)
                eventCount++
            }
        }
    
        return nil
    }
    
    

    watch list之后的变更,更新store。

    DeltaFIFO重要数据结构后续会分析

    生产者分析完了,继续分析消费者

    controller.processLoop

    需要注意Reflector.Store就是controller.config.Queue

    // processLoop drains the work queue.
    func (c *controller) processLoop() {
       for {
          obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
       }
    }
    

    从queue中pop,返回之前调用process func

    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
       s.blockDeltas.Lock()
       defer s.blockDeltas.Unlock()
    
       // from oldest to newest
       for _, d := range obj.(Deltas) {
          switch d.Type {
          case Sync, Added, Updated:
             isSync := d.Type == Sync
             s.cacheMutationDetector.AddObject(d.Object)
             if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                s.indexer.Update(d.Object)
                 
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
             } else {
                err := s.indexer.Add(d.Object)
                 
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
             }
          case Deleted:
             s.indexer.Delete(d.Object)
              
             s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
          }
       }
       return nil
    }
    

    处理delta增量的过程,更新完缓存之后会分发给listeners处理

    func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
       p.listenersLock.RLock()
       defer p.listenersLock.RUnlock()
    
       if sync {
          for _, listener := range p.syncingListeners {
             listener.add(obj)
          }
       } else {
          for _, listener := range p.listeners {
             listener.add(obj)
          }
       }
    }
    

    processorListener

    func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
        ret := &processorListener{
            nextCh:                make(chan interface{}),
            addCh:                 make(chan interface{}),
            handler:               handler,
            pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
            requestedResyncPeriod: requestedResyncPeriod,
            resyncPeriod:          resyncPeriod,
        }
    
        ret.determineNextResync(now)
    
        return ret
    }
    
    func (p *processorListener) add(notification interface{}) {
       p.addCh <- notification
    }
    

    注意addCh是一个block channel,根据addCh找到pop方法

    func (p *processorListener) pop() {
       defer utilruntime.HandleCrash()
       defer close(p.nextCh) // Tell .run() to stop
    
       var nextCh chan<- interface{}
       var notification interface{}
       for {
          select {
          case nextCh <- notification:
             // Notification dispatched
             var ok bool
             notification, ok = p.pendingNotifications.ReadOne()
             if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
             }
          case notificationToAdd, ok := <-p.addCh:
             if notification == nil { 
                notification = notificationToAdd
                nextCh = p.nextCh
             } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
             }
          }
       }
    }
    

    这段代码比较有意思,nextCh也是一个block ch。p有一个pendingNotifications缓存未处理的notification,真正处理用nextCh。不用buffer channel做,可能是因为不好设置channel buffer size

    type processorListener struct {
        nextCh chan interface{}
        addCh  chan interface{}
    
        handler ResourceEventHandler
    
        // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
        // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
        // added until we OOM.
        // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
        // we should try to do something better.
        pendingNotifications buffer.RingGrowing
    
        resyncPeriod time.Duration
        nextResync time.Time
        resyncLock sync.Mutex
    }
    

    根据nextCh找到run方法

    func (p *processorListener) run() {
       stopCh := make(chan struct{})
       wait.Until(func() {
          err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
             for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                   p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                   p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                   p.handler.OnDelete(notification.oldObj)
                default:
                   utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                }
             }
          })
    
       }, 1*time.Minute, stopCh)
    }
    

    根据notification的类型,回调注册的handler处理函数,使用方法如下

    podInformer.AddEventHandler(cache.FilteringResourceEventHandler{
       FilterFunc: pc.filter,
       Handler: cache.ResourceEventHandlerFuncs{
          AddFunc:    pc.add,
          UpdateFunc: pc.update,
          DeleteFunc: pc.delete,
       },
    })
    

    Cache

    缓存的使用方法如下:

    func initNodeCache() {
        informerFactory := informers.NewSharedInformerFactory(K8sClient, 0)
    
        nodeInformer := informerFactory.Core().V1().Nodes()
        NodeStore = nodeInformer.Lister()
    
        forever := make(chan struct{})
        informerFactory.Start(forever)
    
        if !k8scache.WaitForCacheSync(forever, nodeInformer.Informer().HasSynced) {
            return
        }
    }
    
    func (f *nodeInformer) Lister() v1.NodeLister {
        return v1.NewNodeLister(f.Informer().GetIndexer())
    }
    

    至此sharedInformer已经分析完毕,还遗留几个重要数据结构后面分析。

    相关文章

      网友评论

          本文标题:深入分析K8S SharedInformer源码

          本文链接:https://www.haomeiwen.com/subject/rtizgctx.html