美文网首页
k8s 之 client-go informer 源码阅读

k8s 之 client-go informer 源码阅读

作者: wwq2020 | 来源:发表于2020-08-04 16:28 被阅读0次
    image.png

    informers/factory.go 中

    func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
        return NewSharedInformerFactoryWithOptions(client, defaultResync)
    }
    
    
    func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
        factory := &sharedInformerFactory{
            client:           client,
            namespace:        v1.NamespaceAll,
            defaultResync:    defaultResync,
            informers:        make(map[reflect.Type]cache.SharedIndexInformer),
            startedInformers: make(map[reflect.Type]bool),
            customResync:     make(map[reflect.Type]time.Duration),
        }
    
        // Apply all options
        for _, opt := range options {
            factory = opt(factory)
        }
    
        return factory
    }
    
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
        f.lock.Lock()
        defer f.lock.Unlock()
    
        for informerType, informer := range f.informers {
            if !f.startedInformers[informerType] {
                go informer.Run(stopCh)
                f.startedInformers[informerType] = true
            }
        }
    }
    
    func (f *sharedInformerFactory) Apps() apps.Interface {
        return apps.New(f, f.namespace, f.tweakListOptions)
    }
    
    func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()
    
        informerType := reflect.TypeOf(obj)
        informer, exists := f.informers[informerType]
        if exists {
            return informer
        }
    
        resyncPeriod, exists := f.customResync[informerType]
        if !exists {
            resyncPeriod = f.defaultResync
        }
    
        informer = newFunc(f.client, resyncPeriod)
        f.informers[informerType] = informer
    
        return informer
    }
    

    informers/apps/interface.go 中

    func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
        return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
    }
    
    func (g *group) V1() v1.Interface {
        return v1.New(g.factory, g.namespace, g.tweakListOptions)
    }
    

    informers/apps/v1/interface.go 中

    func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
        return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
    }
    
    func (v *version) Deployments() DeploymentInformer {
        return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
    }
    
    

    informers/apps/v1/deployment.go 中

    
    func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
        return cache.NewSharedIndexInformer(
            &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
                },
            },
            &appsv1.Deployment{},
            resyncPeriod,
            indexers,
        )
    }
    
    func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
        return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
    }
    
    func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
        return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
    }
    

    tools/cache/shared_informer.go 中

    func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
        realClock := &clock.RealClock{}
        sharedIndexInformer := &sharedIndexInformer{
            processor:                       &sharedProcessor{clock: realClock},
            indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
            listerWatcher:                   lw,
            objectType:                      exampleObject,
            resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
            defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
            cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
            clock:                           realClock,
        }
        return sharedIndexInformer
    }
    
    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
    
        fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
            KnownObjects:          s.indexer,
            EmitDeltaTypeReplaced: true,
        })
    
        cfg := &Config{
            Queue:            fifo,
            ListerWatcher:    s.listerWatcher,
            ObjectType:       s.objectType,
            FullResyncPeriod: s.resyncCheckPeriod,
            RetryOnError:     false,
            ShouldResync:     s.processor.shouldResync,
    
            Process:           s.HandleDeltas,
            WatchErrorHandler: s.watchErrorHandler,
        }
    
        func() {
            s.startedLock.Lock()
            defer s.startedLock.Unlock()
    
            s.controller = New(cfg)
            s.controller.(*controller).clock = s.clock
            s.started = true
        }()
    
        // Separate stop channel because Processor should be stopped strictly after controller
        processorStopCh := make(chan struct{})
        var wg wait.Group
        defer wg.Wait()              // Wait for Processor to stop
        defer close(processorStopCh) // Tell Processor to stop
        wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
        wg.StartWithChannel(processorStopCh, s.processor.run)
    
        defer func() {
            s.startedLock.Lock()
            defer s.startedLock.Unlock()
            s.stopped = true // Don't want any new listeners
        }()
        s.controller.Run(stopCh)
    }
    
    func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
        s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
    }
    
    func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
    
        if s.stopped {
            klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
            return
        }
    
        if resyncPeriod > 0 {
            if resyncPeriod < minimumResyncPeriod {
                klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
                resyncPeriod = minimumResyncPeriod
            }
    
            if resyncPeriod < s.resyncCheckPeriod {
                if s.started {
                    klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
                    resyncPeriod = s.resyncCheckPeriod
                } else {
                    // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
                    // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
                    // accordingly
                    s.resyncCheckPeriod = resyncPeriod
                    s.processor.resyncCheckPeriodChanged(resyncPeriod)
                }
            }
        }
    
        listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
    
        if !s.started {
            s.processor.addListener(listener)
            return
        }
    
        // in order to safely join, we have to
        // 1. stop sending add/update/delete notifications
        // 2. do a list against the store
        // 3. send synthetic "Add" events to the new handler
        // 4. unblock
        s.blockDeltas.Lock()
        defer s.blockDeltas.Unlock()
    
        s.processor.addListener(listener)
        for _, item := range s.indexer.List() {
            listener.add(addNotification{newObj: item})
        }
    }
    
    
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
        s.blockDeltas.Lock()
        defer s.blockDeltas.Unlock()
    
        // from oldest to newest
        for _, d := range obj.(Deltas) {
            switch d.Type {
            case Sync, Replaced, Added, Updated:
                s.cacheMutationDetector.AddObject(d.Object)
                if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                    if err := s.indexer.Update(d.Object); err != nil {
                        return err
                    }
    
                    isSync := false
                    switch {
                    case d.Type == Sync:
                        // Sync events are only propagated to listeners that requested resync
                        isSync = true
                    case d.Type == Replaced:
                        if accessor, err := meta.Accessor(d.Object); err == nil {
                            if oldAccessor, err := meta.Accessor(old); err == nil {
                                // Replaced events that didn't change resourceVersion are treated as resync events
                                // and only propagated to listeners that requested resync
                                isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                            }
                        }
                    }
                    s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
                } else {
                    if err := s.indexer.Add(d.Object); err != nil {
                        return err
                    }
                    s.processor.distribute(addNotification{newObj: d.Object}, false)
                }
            case Deleted:
                if err := s.indexer.Delete(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
            }
        }
        return nil
    }
    
    
    type sharedProcessor struct {
        listenersStarted bool
        listenersLock    sync.RWMutex
        listeners        []*processorListener
        syncingListeners []*processorListener
        clock            clock.Clock
        wg               wait.Group
    }
    
    func (p *sharedProcessor) addListener(listener *processorListener) {
        p.listenersLock.Lock()
        defer p.listenersLock.Unlock()
    
        p.addListenerLocked(listener)
        if p.listenersStarted {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
    }
    
    func (p *processorListener) pop() {
        defer utilruntime.HandleCrash()
        defer close(p.nextCh) // Tell .run() to stop
    
        var nextCh chan<- interface{}
        var notification interface{}
        for {
            select {
            case nextCh <- notification:
                // Notification dispatched
                var ok bool
                notification, ok = p.pendingNotifications.ReadOne()
                if !ok { // Nothing to pop
                    nextCh = nil // Disable this select case
                }
            case notificationToAdd, ok := <-p.addCh:
                if !ok {
                    return
                }
                if notification == nil { // No notification to pop (and pendingNotifications is empty)
                    // Optimize the case - skip adding to pendingNotifications
                    notification = notificationToAdd
                    nextCh = p.nextCh
                } else { // There is already a notification waiting to be dispatched
                    p.pendingNotifications.WriteOne(notificationToAdd)
                }
            }
        }
    }
    
    func (p *processorListener) run() {
        // this call blocks until the channel is closed.  When a panic happens during the notification
        // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
        // the next notification will be attempted.  This is usually better than the alternative of never
        // delivering again.
        stopCh := make(chan struct{})
        wait.Until(func() {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            close(stopCh)
        }, 1*time.Second, stopCh)
    }
    
    

    tools/cache/controller.go 中

    // New makes a new Controller from the given Config.
    func New(c *Config) Controller {
        ctlr := &controller{
            config: *c,
            clock:  &clock.RealClock{},
        }
        return ctlr
    }
    
    // Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
    // It's an error to call Run more than once.
    // Run blocks; call via go.
    func (c *controller) Run(stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
        go func() {
            <-stopCh
            c.config.Queue.Close()
        }()
        r := NewReflector(
            c.config.ListerWatcher,
            c.config.ObjectType,
            c.config.Queue,
            c.config.FullResyncPeriod,
        )
        r.ShouldResync = c.config.ShouldResync
        r.clock = c.clock
        if c.config.WatchErrorHandler != nil {
            r.watchErrorHandler = c.config.WatchErrorHandler
        }
    
        c.reflectorMutex.Lock()
        c.reflector = r
        c.reflectorMutex.Unlock()
    
        var wg wait.Group
        defer wg.Wait()
    
        wg.StartWithChannel(stopCh, r.Run)
    
        wait.Until(c.processLoop, time.Second, stopCh)
    }
    
    func (c *controller) processLoop() {
        for {
            obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
            if err != nil {
                if err == ErrFIFOClosed {
                    return
                }
                if c.config.RetryOnError {
                    // This is the safe way to re-enqueue.
                    c.config.Queue.AddIfNotPresent(obj)
                }
            }
        }
    }
    
    func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
        return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
    }
    
    // NewNamedReflector same as NewReflector, but with a specified name for logging
    func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
        realClock := &clock.RealClock{}
        r := &Reflector{
            name:          name,
            listerWatcher: lw,
            store:         store,
            // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
            // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
            // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
            backoffManager:    wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
            resyncPeriod:      resyncPeriod,
            clock:             realClock,
            watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
        }
        r.setExpectedType(expectedType)
        return r
    }
    
    func (r *Reflector) Run(stopCh <-chan struct{}) {
        klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
        wait.BackoffUntil(func() {
            if err := r.ListAndWatch(stopCh); err != nil {
                r.watchErrorHandler(r, err)
            }
        }, r.backoffManager, true, stopCh)
        klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    }
    
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
        klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
        var resourceVersion string
    
        options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
    
        if err := func() error {
            initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
            defer initTrace.LogIfLong(10 * time.Second)
            var list runtime.Object
            var paginatedResult bool
            var err error
            listCh := make(chan struct{}, 1)
            panicCh := make(chan interface{}, 1)
            go func() {
                defer func() {
                    if r := recover(); r != nil {
                        panicCh <- r
                    }
                }()
                // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
                // list request will return the full response.
                pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                    return r.listerWatcher.List(opts)
                }))
                switch {
                case r.WatchListPageSize != 0:
                    pager.PageSize = r.WatchListPageSize
                case r.paginatedResult:
                    // We got a paginated result initially. Assume this resource and server honor
                    // paging requests (i.e. watch cache is probably disabled) and leave the default
                    // pager size set.
                case options.ResourceVersion != "" && options.ResourceVersion != "0":
                    // User didn't explicitly request pagination.
                    //
                    // With ResourceVersion != "", we have a possibility to list from watch cache,
                    // but we do that (for ResourceVersion != "0") only if Limit is unset.
                    // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                    // switch off pagination to force listing from watch cache (if enabled).
                    // With the existing semantic of RV (result is at least as fresh as provided RV),
                    // this is correct and doesn't lead to going back in time.
                    //
                    // We also don't turn off pagination for ResourceVersion="0", since watch cache
                    // is ignoring Limit in that case anyway, and if watch cache is not enabled
                    // we don't introduce regression.
                    pager.PageSize = 0
                }
    
                list, paginatedResult, err = pager.List(context.Background(), options)
                if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                    r.setIsLastSyncResourceVersionUnavailable(true)
                    // Retry immediately if the resource version used to list is unavailable.
                    // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                    // continuation pages, but the pager might not be enabled, the full list might fail because the
                    // resource version it is listing at is expired or the cache may not yet be synced to the provided
                    // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                    // the reflector makes forward progress.
                    list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
                }
                close(listCh)
            }()
            select {
            case <-stopCh:
                return nil
            case r := <-panicCh:
                panic(r)
            case <-listCh:
            }
            if err != nil {
                return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
            }
    
            // We check if the list was paginated and if so set the paginatedResult based on that.
            // However, we want to do that only for the initial list (which is the only case
            // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
            // situations we may force listing directly from etcd (by setting ResourceVersion="")
            // which will return paginated result, even if watch cache is enabled. However, in
            // that case, we still want to prefer sending requests to watch cache if possible.
            //
            // Paginated result returned for request with ResourceVersion="0" mean that watch
            // cache is disabled and there are a lot of objects of a given type. In such case,
            // there is no need to prefer listing from watch cache.
            if options.ResourceVersion == "0" && paginatedResult {
                r.paginatedResult = true
            }
    
            r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
            initTrace.Step("Objects listed")
            listMetaInterface, err := meta.ListAccessor(list)
            if err != nil {
                return fmt.Errorf("unable to understand list result %#v: %v", list, err)
            }
            resourceVersion = listMetaInterface.GetResourceVersion()
            initTrace.Step("Resource version extracted")
            items, err := meta.ExtractList(list)
            if err != nil {
                return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
            }
            initTrace.Step("Objects extracted")
            if err := r.syncWith(items, resourceVersion); err != nil {
                return fmt.Errorf("unable to sync list result: %v", err)
            }
            initTrace.Step("SyncWith done")
            r.setLastSyncResourceVersion(resourceVersion)
            initTrace.Step("Resource version updated")
            return nil
        }(); err != nil {
            return err
        }
    
        resyncerrc := make(chan error, 1)
        cancelCh := make(chan struct{})
        defer close(cancelCh)
        go func() {
            resyncCh, cleanup := r.resyncChan()
            defer func() {
                cleanup() // Call the last one written into cleanup
            }()
            for {
                select {
                case <-resyncCh:
                case <-stopCh:
                    return
                case <-cancelCh:
                    return
                }
                if r.ShouldResync == nil || r.ShouldResync() {
                    klog.V(4).Infof("%s: forcing resync", r.name)
                    if err := r.store.Resync(); err != nil {
                        resyncerrc <- err
                        return
                    }
                }
                cleanup()
                resyncCh, cleanup = r.resyncChan()
            }
        }()
    
        for {
            // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
            select {
            case <-stopCh:
                return nil
            default:
            }
    
            timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
            options = metav1.ListOptions{
                ResourceVersion: resourceVersion,
                // We want to avoid situations of hanging watchers. Stop any wachers that do not
                // receive any events within the timeout window.
                TimeoutSeconds: &timeoutSeconds,
                // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
                // Reflector doesn't assume bookmarks are returned at all (if the server do not support
                // watch bookmarks, it will ignore this field).
                AllowWatchBookmarks: true,
            }
    
            // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
            start := r.clock.Now()
            w, err := r.listerWatcher.Watch(options)
            if err != nil {
                // If this is "connection refused" error, it means that most likely apiserver is not responsive.
                // It doesn't make sense to re-list all objects because most likely we will be able to restart
                // watch where we ended.
                // If that's the case wait and resend watch request.
                if utilnet.IsConnectionRefused(err) {
                    time.Sleep(time.Second)
                    continue
                }
                return err
            }
    
            if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
                if err != errorStopRequested {
                    switch {
                    case isExpiredError(err):
                        // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                        // has a semantic that it returns data at least as fresh as provided RV.
                        // So first try to LIST with setting RV to resource version of last observed object.
                        klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                    default:
                        klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                    }
                }
                return nil
            }
        }
    }
    
    func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
        eventCount := 0
    
        // Stopping the watcher should be idempotent and if we return from this function there's no way
        // we're coming back in with the same watch interface.
        defer w.Stop()
    
    loop:
        for {
            select {
            case <-stopCh:
                return errorStopRequested
            case err := <-errc:
                return err
            case event, ok := <-w.ResultChan():
                if !ok {
                    break loop
                }
                if event.Type == watch.Error {
                    return apierrors.FromObject(event.Object)
                }
                if r.expectedType != nil {
                    if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                        utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                        continue
                    }
                }
                if r.expectedGVK != nil {
                    if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                        utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                        continue
                    }
                }
                meta, err := meta.Accessor(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                    continue
                }
                newResourceVersion := meta.GetResourceVersion()
                switch event.Type {
                case watch.Added:
                    err := r.store.Add(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                    }
                case watch.Modified:
                    err := r.store.Update(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                    }
                case watch.Deleted:
                    // TODO: Will any consumers need access to the "last known
                    // state", which is passed in event.Object? If so, may need
                    // to change this.
                    err := r.store.Delete(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                    }
                case watch.Bookmark:
                    // A `Bookmark` means watch has synced here, just update the resourceVersion
                default:
                    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                }
                *resourceVersion = newResourceVersion
                r.setLastSyncResourceVersion(newResourceVersion)
                eventCount++
            }
        }
    
        watchDuration := r.clock.Since(start)
        if watchDuration < 1*time.Second && eventCount == 0 {
            return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
        }
        klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
        return nil
    }
    
    

    相关文章

      网友评论

          本文标题:k8s 之 client-go informer 源码阅读

          本文链接:https://www.haomeiwen.com/subject/zozvrktx.html