美文网首页
k8s 之 garbagecollector controlle

k8s 之 garbagecollector controlle

作者: wwq2020 | 来源:发表于2020-07-21 14:38 被阅读0次

    简介

    garbagecollector controller 从 apiserver 获取可删除资源,然后监控他们的变化,然后构建依赖图,删除所有者已经不存在的资源

    源码

    cmd/kube-controller-manager/app/core.go 中

    func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
        if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
            return nil, false, nil
        }
    
        gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
    
        config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
        metadataClient, err := metadata.NewForConfig(config)
        if err != nil {
            return nil, true, err
        }
    
        ignoredResources := make(map[schema.GroupResource]struct{})
        for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
            ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
        }
        garbageCollector, err := garbagecollector.NewGarbageCollector(
            metadataClient,
            ctx.RESTMapper,
            ignoredResources,
            ctx.ObjectOrMetadataInformerFactory,
            ctx.InformersStarted,
        )
        if err != nil {
            return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)
        }
    
        // Start the garbage collector.
        workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
        go garbageCollector.Run(workers, ctx.Stop)
    
        // Periodically refresh the RESTMapper with new discovery information and sync
        // the garbage collector.
        go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)
    
        return garbagecollector.NewDebugHandler(garbageCollector), true, nil
    }
    

    pkg/controller/garbagecollector/garbagecollector.go 中

    func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
        defer gc.attemptToDelete.ShutDown()
        defer gc.attemptToOrphan.ShutDown()
        defer gc.dependencyGraphBuilder.graphChanges.ShutDown()
    
        klog.Infof("Starting garbage collector controller")
        defer klog.Infof("Shutting down garbage collector controller")
    
        go gc.dependencyGraphBuilder.Run(stopCh)
    
        if !cache.WaitForNamedCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
            return
        }
    
        klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")
    
        // gc workers
        for i := 0; i < workers; i++ {
            go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
            go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
        }
    
        <-stopCh
    }
    
    func (gc *GarbageCollector) runAttemptToDeleteWorker() {
        for gc.attemptToDeleteWorker() {
        }
    }
    
    func (gc *GarbageCollector) attemptToDeleteWorker() bool {
        item, quit := gc.attemptToDelete.Get()
        gc.workerLock.RLock()
        defer gc.workerLock.RUnlock()
        if quit {
            return false
        }
        defer gc.attemptToDelete.Done(item)
        n, ok := item.(*node)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
            return true
        }
        err := gc.attemptToDeleteItem(n)
        if err != nil {
            if _, ok := err.(*restMappingError); ok {
                // There are at least two ways this can happen:
                // 1. The reference is to an object of a custom type that has not yet been
                //    recognized by gc.restMapper (this is a transient error).
                // 2. The reference is to an invalid group/version. We don't currently
                //    have a way to distinguish this from a valid type we will recognize
                //    after the next discovery sync.
                // For now, record the error and retry.
                klog.V(5).Infof("error syncing item %s: %v", n, err)
            } else {
                utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
            }
            // retry if garbage collection of an object failed.
            gc.attemptToDelete.AddRateLimited(item)
        } else if !n.isObserved() {
            // requeue if item hasn't been observed via an informer event yet.
            // otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
            // see https://issue.k8s.io/56121
            klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
            gc.attemptToDelete.AddRateLimited(item)
        }
        return true
    }
    
    func (gc *GarbageCollector) runAttemptToOrphanWorker() {
        for gc.attemptToOrphanWorker() {
        }
    }
    
    
    func (gc *GarbageCollector) attemptToOrphanWorker() bool {
        item, quit := gc.attemptToOrphan.Get()
        gc.workerLock.RLock()
        defer gc.workerLock.RUnlock()
        if quit {
            return false
        }
        defer gc.attemptToOrphan.Done(item)
        owner, ok := item.(*node)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
            return true
        }
        // we don't need to lock each element, because they never get updated
        owner.dependentsLock.RLock()
        dependents := make([]*node, 0, len(owner.dependents))
        for dependent := range owner.dependents {
            dependents = append(dependents, dependent)
        }
        owner.dependentsLock.RUnlock()
    
        err := gc.orphanDependents(owner.identity, dependents)
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
            gc.attemptToOrphan.AddRateLimited(item)
            return true
        }
        // update the owner, remove "orphaningFinalizer" from its finalizers list
        err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
            gc.attemptToOrphan.AddRateLimited(item)
        }
        return true
    }
    
    func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
        oldResources := make(map[schema.GroupVersionResource]struct{})
        wait.Until(func() {
            // Get the current resource list from discovery.
            newResources := GetDeletableResources(discoveryClient)
    
            // This can occur if there is an internal error in GetDeletableResources.
            if len(newResources) == 0 {
                klog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
                return
            }
    
            // Decide whether discovery has reported a change.
            if reflect.DeepEqual(oldResources, newResources) {
                klog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
                return
            }
    
            // Ensure workers are paused to avoid processing events before informers
            // have resynced.
            gc.workerLock.Lock()
            defer gc.workerLock.Unlock()
    
            // Once we get here, we should not unpause workers until we've successfully synced
            attempt := 0
            wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
                attempt++
    
                // On a reattempt, check if available resources have changed
                if attempt > 1 {
                    newResources = GetDeletableResources(discoveryClient)
                    if len(newResources) == 0 {
                        klog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
                        return false, nil
                    }
                }
    
                klog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))
    
                // Resetting the REST mapper will also invalidate the underlying discovery
                // client. This is a leaky abstraction and assumes behavior about the REST
                // mapper, but we'll deal with it for now.
                gc.restMapper.Reset()
                klog.V(4).Infof("reset restmapper")
    
                // Perform the monitor resync and wait for controllers to report cache sync.
                //
                // NOTE: It's possible that newResources will diverge from the resources
                // discovered by restMapper during the call to Reset, since they are
                // distinct discovery clients invalidated at different times. For example,
                // newResources may contain resources not returned in the restMapper's
                // discovery call if the resources appeared in-between the calls. In that
                // case, the restMapper will fail to map some of newResources until the next
                // attempt.
                if err := gc.resyncMonitors(newResources); err != nil {
                    utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
                    return false, nil
                }
                klog.V(4).Infof("resynced monitors")
    
                // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
                // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
                // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
                // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
                // note that workers stay paused until we successfully resync.
                if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
                    utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
                    return false, nil
                }
    
                // success, break out of the loop
                return true, nil
            }, stopCh)
    
            // Finally, keep track of our new state. Do this after all preceding steps
            // have succeeded to ensure we'll retry on subsequent syncs if an error
            // occurred.
            oldResources = newResources
            klog.V(2).Infof("synced garbage collector")
        }, period, stopCh)
    }
    
    func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
        if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
            return err
        }
        gc.dependencyGraphBuilder.startMonitors()
        return nil
    }
    

    pkg/controller/garbagecollector/graph_builder.go 中

    func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
        klog.Infof("GraphBuilder running")
        defer klog.Infof("GraphBuilder stopping")
    
        // Set up the stop channel.
        gb.monitorLock.Lock()
        gb.stopCh = stopCh
        gb.running = true
        gb.monitorLock.Unlock()
    
        // Start monitors and begin change processing until the stop channel is
        // closed.
        gb.startMonitors()
        wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
    
        // Stop any running monitors.
        gb.monitorLock.Lock()
        defer gb.monitorLock.Unlock()
        monitors := gb.monitors
        stopped := 0
        for _, monitor := range monitors {
            if monitor.stopCh != nil {
                stopped++
                close(monitor.stopCh)
            }
        }
    
        // reset monitors so that the graph builder can be safely re-run/synced.
        gb.monitors = nil
        klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
    }
    
    func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
        gb.monitorLock.Lock()
        defer gb.monitorLock.Unlock()
    
        toRemove := gb.monitors
        if toRemove == nil {
            toRemove = monitors{}
        }
        current := monitors{}
        errs := []error{}
        kept := 0
        added := 0
        for resource := range resources {
            if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
                continue
            }
            if m, ok := toRemove[resource]; ok {
                current[resource] = m
                delete(toRemove, resource)
                kept++
                continue
            }
            kind, err := gb.restMapper.KindFor(resource)
            if err != nil {
                errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
                continue
            }
            c, s, err := gb.controllerFor(resource, kind)
            if err != nil {
                errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
                continue
            }
            current[resource] = &monitor{store: s, controller: c}
            added++
        }
        gb.monitors = current
    
        for _, monitor := range toRemove {
            if monitor.stopCh != nil {
                close(monitor.stopCh)
            }
        }
    
        klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
        // NewAggregate returns nil if errs is 0-length
        return utilerrors.NewAggregate(errs)
    }
    
    func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
        handlers := cache.ResourceEventHandlerFuncs{
            // add the event to the dependencyGraphBuilder's graphChanges.
            AddFunc: func(obj interface{}) {
                event := &event{
                    eventType: addEvent,
                    obj:       obj,
                    gvk:       kind,
                }
                gb.graphChanges.Add(event)
            },
            UpdateFunc: func(oldObj, newObj interface{}) {
                // TODO: check if there are differences in the ownerRefs,
                // finalizers, and DeletionTimestamp; if not, ignore the update.
                event := &event{
                    eventType: updateEvent,
                    obj:       newObj,
                    oldObj:    oldObj,
                    gvk:       kind,
                }
                gb.graphChanges.Add(event)
            },
            DeleteFunc: func(obj interface{}) {
                // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
                if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
                    obj = deletedFinalStateUnknown.Obj
                }
                event := &event{
                    eventType: deleteEvent,
                    obj:       obj,
                    gvk:       kind,
                }
                gb.graphChanges.Add(event)
            },
        }
        shared, err := gb.sharedInformers.ForResource(resource)
        if err != nil {
            klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
            return nil, nil, err
        }
        klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
        // need to clone because it's from a shared cache
        shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
        return shared.Informer().GetController(), shared.Informer().GetStore(), nil
    }
    

    相关文章

      网友评论

          本文标题:k8s 之 garbagecollector controlle

          本文链接:https://www.haomeiwen.com/subject/ifpikktx.html