美文网首页
k8s 之 cronjob controller 源码简单分析

k8s 之 cronjob controller 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-20 17:26 被阅读0次

    简介

    cronjob controller 监控 cronjob 的变化,然后创建相应的 pod
    kubelet 监听 pod 变化,进行实际的a pod 操作
    然后cronjob c

    创建处

    pkg/registry/batch/cronjob/storage/storage.go 中

    func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
        store := &genericregistry.Store{
            NewFunc:                  func() runtime.Object { return &batch.CronJob{} },
            NewListFunc:              func() runtime.Object { return &batch.CronJobList{} },
            DefaultQualifiedResource: batch.Resource("cronjobs"),
    
            CreateStrategy: cronjob.Strategy,
            UpdateStrategy: cronjob.Strategy,
            DeleteStrategy: cronjob.Strategy,
    
            TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
        }
        options := &generic.StoreOptions{RESTOptions: optsGetter}
        if err := store.CompleteWithOptions(options); err != nil {
            return nil, nil, err
        }
    
        statusStore := *store
        statusStore.UpdateStrategy = cronjob.StatusStrategy
    
        return &REST{store}, &StatusREST{store: &statusStore}, nil
    }
    

    controller

    cmd/kube-controller-manager/app/batch.go 中

    func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
        if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
            return nil, false, nil
        }
        cjc, err := cronjob.NewController(
            ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
        )
        if err != nil {
            return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
        }
        go cjc.Run(ctx.Stop)
        return nil, true, nil
    }
    

    pkg/controller/cronjob/cronjob_controller.go 中

    func NewController(kubeClient clientset.Interface) (*Controller, error) {
        eventBroadcaster := record.NewBroadcaster()
        eventBroadcaster.StartStructuredLogging(0)
        eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    
        if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
            if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
                return nil, err
            }
        }
    
        jm := &Controller{
            kubeClient: kubeClient,
            jobControl: realJobControl{KubeClient: kubeClient},
            cjControl:  &realCJControl{KubeClient: kubeClient},
            podControl: &realPodControl{KubeClient: kubeClient},
            recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
        }
    
        return jm, nil
    }
    
    // Run starts the main goroutine responsible for watching and syncing jobs.
    func (jm *Controller) Run(stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
        klog.Infof("Starting CronJob Manager")
        // Check things every 10 second.
        go wait.Until(jm.syncAll, 10*time.Second, stopCh)
        <-stopCh
        klog.Infof("Shutting down CronJob Manager")
    }
    
    // syncAll lists all the CronJobs and Jobs and reconciles them.
    func (jm *Controller) syncAll() {
        // List children (Jobs) before parents (CronJob).
        // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
        // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
        // Note that this only works because we are NOT using any caches here.
        jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
            return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
        }
    
        js := make([]batchv1.Job, 0)
        err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
            jobTmp, ok := object.(*batchv1.Job)
            if !ok {
                return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
            }
            js = append(js, *jobTmp)
            return nil
        })
    
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
            return
        }
    
        klog.V(4).Infof("Found %d jobs", len(js))
        cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
            return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
        }
    
        jobsByCj := groupJobsByParent(js)
        klog.V(4).Infof("Found %d groups", len(jobsByCj))
        err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
            cj, ok := object.(*batchv1beta1.CronJob)
            if !ok {
                return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)
            }
            syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
            cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
            return nil
        })
    
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
            return
        }
    }
    
    

    相关文章

      网友评论

          本文标题:k8s 之 cronjob controller 源码简单分析

          本文链接:https://www.haomeiwen.com/subject/wppykktx.html