美文网首页
kube controller manager之ttl-afte

kube controller manager之ttl-afte

作者: wwq2020 | 来源:发表于2023-06-07 10:07 被阅读0次

    作用

    当job创建时,如果设置了ttlSecondsAfterFinished且job已完成(condition中Complete或者Failed为True)
    当job更新时,如果设置了ttlSecondsAfterFinished且job已完成(condition中Complete或者Failed为True)

    image.png

    相关代码

    前置相关代码

    cmd/kube-controller-manager/app/controllermanager.go
    func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
        ...
        controllers["ttl-after-finished"] = startTTLAfterFinishedController
        ...
        return controllers
    }
    

    cmd/kube-controller-manager/app/core.go

    func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
        go ttlafterfinished.New(
            controllerContext.InformerFactory.Batch().V1().Jobs(),
            controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
        ).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs))
        return nil, true, nil
    }
    

    pkg/controller/ttlafterfinished/ttlafterfinished_controller.go

    func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
        eventBroadcaster := record.NewBroadcaster()
        eventBroadcaster.StartStructuredLogging(0)
        eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
    
        if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
            ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
        }
    
        metrics.Register()
    
        tc := &Controller{
            client:   client,
            recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
            queue:    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
        }
    
        jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    tc.addJob,
            UpdateFunc: tc.updateJob,
        })
    
        tc.jLister = jobInformer.Lister()
        tc.jListerSynced = jobInformer.Informer().HasSynced
    
        tc.clock = clock.RealClock{}
    
        return tc
    }
    
    // Run starts the workers to clean up Jobs.
    func (tc *Controller) Run(ctx context.Context, workers int) {
        defer utilruntime.HandleCrash()
        defer tc.queue.ShutDown()
    
        klog.Infof("Starting TTL after finished controller")
        defer klog.Infof("Shutting down TTL after finished controller")
    
        if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), tc.jListerSynced) {
            return
        }
    
        for i := 0; i < workers; i++ {
            go wait.UntilWithContext(ctx, tc.worker, time.Second)
        }
    
        <-ctx.Done()
    }
    

    主要代码

    pkg/controller/volume/pvprotection/pv_protection_controller.go

    func (tc *Controller) addJob(obj interface{}) {
        job := obj.(*batch.Job)
        klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)
    
        if job.DeletionTimestamp == nil && needsCleanup(job) {
            tc.enqueue(job)
        }
    }
    
    func (tc *Controller) updateJob(old, cur interface{}) {
        job := cur.(*batch.Job)
        klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)
    
        if job.DeletionTimestamp == nil && needsCleanup(job) {
            tc.enqueue(job)
        }
    }
    
    
    func (tc *Controller) processJob(ctx context.Context, key string) error {
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            return err
        }
    
        klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
        // Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
        job, err := tc.jLister.Jobs(namespace).Get(name)
        if errors.IsNotFound(err) {
            return nil
        }
        if err != nil {
            return err
        }
    
        if expiredAt, err := tc.processTTL(job); err != nil {
            return err
        } else if expiredAt == nil {
            return nil
        }
    
        // The Job's TTL is assumed to have expired, but the Job TTL might be stale.
        // Before deleting the Job, do a final sanity check.
        // If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
        // The latest Job may have a different UID, but it's fine because the checks will be run again.
        fresh, err := tc.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
        if errors.IsNotFound(err) {
            return nil
        }
        if err != nil {
            return err
        }
        // Use the latest Job TTL to see if the TTL truly expires.
        expiredAt, err := tc.processTTL(fresh)
        if err != nil {
            return err
        } else if expiredAt == nil {
            return nil
        }
        // Cascade deletes the Jobs if TTL truly expires.
        policy := metav1.DeletePropagationForeground
        options := metav1.DeleteOptions{
            PropagationPolicy: &policy,
            Preconditions:     &metav1.Preconditions{UID: &fresh.UID},
        }
        klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
        if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
            return err
        }
        metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
        return nil
    }
    

    相关文章

      网友评论

          本文标题:kube controller manager之ttl-afte

          本文链接:https://www.haomeiwen.com/subject/sbygedtx.html