简介
garbagecollector controller 从 apiserver 获取可删除资源,然后监控他们的变化,然后构建依赖图,删除所有者已经不存在的资源
源码
cmd/kube-controller-manager/app/core.go 中
func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
return nil, false, nil
}
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
metadataClient, err := metadata.NewForConfig(config)
if err != nil {
return nil, true, err
}
ignoredResources := make(map[schema.GroupResource]struct{})
for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
}
garbageCollector, err := garbagecollector.NewGarbageCollector(
metadataClient,
ctx.RESTMapper,
ignoredResources,
ctx.ObjectOrMetadataInformerFactory,
ctx.InformersStarted,
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)
}
// Start the garbage collector.
workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
go garbageCollector.Run(workers, ctx.Stop)
// Periodically refresh the RESTMapper with new discovery information and sync
// the garbage collector.
go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)
return garbagecollector.NewDebugHandler(garbageCollector), true, nil
}
pkg/controller/garbagecollector/garbagecollector.go 中
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer gc.attemptToDelete.ShutDown()
defer gc.attemptToOrphan.ShutDown()
defer gc.dependencyGraphBuilder.graphChanges.ShutDown()
klog.Infof("Starting garbage collector controller")
defer klog.Infof("Shutting down garbage collector controller")
go gc.dependencyGraphBuilder.Run(stopCh)
if !cache.WaitForNamedCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
return
}
klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")
// gc workers
for i := 0; i < workers; i++ {
go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
}
<-stopCh
}
func (gc *GarbageCollector) runAttemptToDeleteWorker() {
for gc.attemptToDeleteWorker() {
}
}
func (gc *GarbageCollector) attemptToDeleteWorker() bool {
item, quit := gc.attemptToDelete.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}
defer gc.attemptToDelete.Done(item)
n, ok := item.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
return true
}
err := gc.attemptToDeleteItem(n)
if err != nil {
if _, ok := err.(*restMappingError); ok {
// There are at least two ways this can happen:
// 1. The reference is to an object of a custom type that has not yet been
// recognized by gc.restMapper (this is a transient error).
// 2. The reference is to an invalid group/version. We don't currently
// have a way to distinguish this from a valid type we will recognize
// after the next discovery sync.
// For now, record the error and retry.
klog.V(5).Infof("error syncing item %s: %v", n, err)
} else {
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
}
// retry if garbage collection of an object failed.
gc.attemptToDelete.AddRateLimited(item)
} else if !n.isObserved() {
// requeue if item hasn't been observed via an informer event yet.
// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
// see https://issue.k8s.io/56121
klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
gc.attemptToDelete.AddRateLimited(item)
}
return true
}
func (gc *GarbageCollector) runAttemptToOrphanWorker() {
for gc.attemptToOrphanWorker() {
}
}
func (gc *GarbageCollector) attemptToOrphanWorker() bool {
item, quit := gc.attemptToOrphan.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}
defer gc.attemptToOrphan.Done(item)
owner, ok := item.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
return true
}
// we don't need to lock each element, because they never get updated
owner.dependentsLock.RLock()
dependents := make([]*node, 0, len(owner.dependents))
for dependent := range owner.dependents {
dependents = append(dependents, dependent)
}
owner.dependentsLock.RUnlock()
err := gc.orphanDependents(owner.identity, dependents)
if err != nil {
utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
gc.attemptToOrphan.AddRateLimited(item)
return true
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
if err != nil {
utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
gc.attemptToOrphan.AddRateLimited(item)
}
return true
}
func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
oldResources := make(map[schema.GroupVersionResource]struct{})
wait.Until(func() {
// Get the current resource list from discovery.
newResources := GetDeletableResources(discoveryClient)
// This can occur if there is an internal error in GetDeletableResources.
if len(newResources) == 0 {
klog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
return
}
// Decide whether discovery has reported a change.
if reflect.DeepEqual(oldResources, newResources) {
klog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
return
}
// Ensure workers are paused to avoid processing events before informers
// have resynced.
gc.workerLock.Lock()
defer gc.workerLock.Unlock()
// Once we get here, we should not unpause workers until we've successfully synced
attempt := 0
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
attempt++
// On a reattempt, check if available resources have changed
if attempt > 1 {
newResources = GetDeletableResources(discoveryClient)
if len(newResources) == 0 {
klog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
return false, nil
}
}
klog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))
// Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now.
gc.restMapper.Reset()
klog.V(4).Infof("reset restmapper")
// Perform the monitor resync and wait for controllers to report cache sync.
//
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// attempt.
if err := gc.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
return false, nil
}
klog.V(4).Infof("resynced monitors")
// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
// note that workers stay paused until we successfully resync.
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
return false, nil
}
// success, break out of the loop
return true, nil
}, stopCh)
// Finally, keep track of our new state. Do this after all preceding steps
// have succeeded to ensure we'll retry on subsequent syncs if an error
// occurred.
oldResources = newResources
klog.V(2).Infof("synced garbage collector")
}, period, stopCh)
}
func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
return err
}
gc.dependencyGraphBuilder.startMonitors()
return nil
}
pkg/controller/garbagecollector/graph_builder.go 中
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
klog.Infof("GraphBuilder running")
defer klog.Infof("GraphBuilder stopping")
// Set up the stop channel.
gb.monitorLock.Lock()
gb.stopCh = stopCh
gb.running = true
gb.monitorLock.Unlock()
// Start monitors and begin change processing until the stop channel is
// closed.
gb.startMonitors()
wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
// Stop any running monitors.
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
monitors := gb.monitors
stopped := 0
for _, monitor := range monitors {
if monitor.stopCh != nil {
stopped++
close(monitor.stopCh)
}
}
// reset monitors so that the graph builder can be safely re-run/synced.
gb.monitors = nil
klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
}
func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
toRemove := gb.monitors
if toRemove == nil {
toRemove = monitors{}
}
current := monitors{}
errs := []error{}
kept := 0
added := 0
for resource := range resources {
if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
continue
}
if m, ok := toRemove[resource]; ok {
current[resource] = m
delete(toRemove, resource)
kept++
continue
}
kind, err := gb.restMapper.KindFor(resource)
if err != nil {
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
continue
}
c, s, err := gb.controllerFor(resource, kind)
if err != nil {
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
continue
}
current[resource] = &monitor{store: s, controller: c}
added++
}
gb.monitors = current
for _, monitor := range toRemove {
if monitor.stopCh != nil {
close(monitor.stopCh)
}
}
klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
// NewAggregate returns nil if errs is 0-length
return utilerrors.NewAggregate(errs)
}
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
event := &event{
eventType: addEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := &event{
eventType: updateEvent,
obj: newObj,
oldObj: oldObj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
event := &event{
eventType: deleteEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
}
shared, err := gb.sharedInformers.ForResource(resource)
if err != nil {
klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
return nil, nil, err
}
klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
}
网友评论