美文网首页
k8s 之 job controller 源码简单分析

k8s 之 job controller 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-20 16:48 被阅读0次

简介

job controller 监控 job 的变化,然后创建相应的 pod
kubelet 监听 pod 变化,进行实际的 pod 操作

创建处

pkg/registry/batch/job/storage/storage.go 中

func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &batch.Job{} },
        NewListFunc:              func() runtime.Object { return &batch.JobList{} },
        PredicateFunc:            job.MatchJob,
        DefaultQualifiedResource: batch.Resource("jobs"),

        CreateStrategy: job.Strategy,
        UpdateStrategy: job.Strategy,
        DeleteStrategy: job.Strategy,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: job.GetAttrs}
    if err := store.CompleteWithOptions(options); err != nil {
        return nil, nil, err
    }

    statusStore := *store
    statusStore.UpdateStrategy = job.StatusStrategy

    return &REST{store}, &StatusREST{store: &statusStore}, nil
}


controller

cmd/kube-controller-manager/app/batch.go 中

func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] {
        return nil, false, nil
    }
    go job.NewController(
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Batch().V1().Jobs(),
        ctx.ClientBuilder.ClientOrDie("job-controller"),
    ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)
    return nil, true, nil
}

pkg/controller/job/job_controller.go 中

func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    jm := &Controller{
        kubeClient: kubeClient,
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
        },
        expectations: controller.NewControllerExpectations(),
        queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
        recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
    }

    jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
        UpdateFunc: jm.updateJob,
        DeleteFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
    })
    jm.jobLister = jobInformer.Lister()
    jm.jobStoreSynced = jobInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    jm.addPod,
        UpdateFunc: jm.updatePod,
        DeleteFunc: jm.deletePod,
    })
    jm.podStore = podInformer.Lister()
    jm.podStoreSynced = podInformer.Informer().HasSynced

    jm.updateHandler = jm.updateJobStatus
    jm.syncHandler = jm.syncJob

    return jm
}

func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer jm.queue.ShutDown()

    klog.Infof("Starting job controller")
    defer klog.Infof("Shutting down job controller")

    if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(jm.worker, time.Second, stopCh)
    }

    <-stopCh
}

func (jm *Controller) worker() {
    for jm.processNextWorkItem() {
    }
}

func (jm *Controller) processNextWorkItem() bool {
    key, quit := jm.queue.Get()
    if quit {
        return false
    }
    defer jm.queue.Done(key)

    forget, err := jm.syncHandler(key.(string))
    if err == nil {
        if forget {
            jm.queue.Forget(key)
        }
        return true
    }

    utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
    jm.queue.AddRateLimited(key)

    return true
}

func (jm *Controller) syncJob(key string) (bool, error) {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
    }()

    ns, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return false, err
    }
    if len(ns) == 0 || len(name) == 0 {
        return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
    }
    sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            klog.V(4).Infof("Job has been deleted: %v", key)
            jm.expectations.DeleteExpectations(key)
            return true, nil
        }
        return false, err
    }
    job := *sharedJob

    // if job was finished previously, we don't want to redo the termination
    if IsJobFinished(&job) {
        return true, nil
    }

    // retrieve the previous number of retry
    previousRetry := jm.queue.NumRequeues(key)

    // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
    // and update the expectations after we've retrieved active pods from the store. If a new pod enters
    // the store after we've checked the expectation, the job sync is just deferred till the next relist.
    jobNeedsSync := jm.expectations.SatisfiedExpectations(key)

    pods, err := jm.getPodsForJob(&job)
    if err != nil {
        return false, err
    }

    activePods := controller.FilterActivePods(pods)
    active := int32(len(activePods))
    succeeded, failed := getStatus(pods)
    conditions := len(job.Status.Conditions)
    // job first start
    if job.Status.StartTime == nil {
        now := metav1.Now()
        job.Status.StartTime = &now
        // enqueue a sync to check if job past ActiveDeadlineSeconds
        if job.Spec.ActiveDeadlineSeconds != nil {
            klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds",
                key, *job.Spec.ActiveDeadlineSeconds)
            jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
        }
    }

    var manageJobErr error
    jobFailed := false
    var failureReason string
    var failureMessage string

    jobHaveNewFailure := failed > job.Status.Failed
    // new failures happen when status does not reflect the failures and active
    // is different than parallelism, otherwise the previous controller loop
    // failed updating status so even if we pick up failure it is not a new one
    exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
        (int32(previousRetry)+1 > *job.Spec.BackoffLimit)

    if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
        // check if the number of pod restart exceeds backoff (for restart OnFailure only)
        // OR if the number of failed jobs increased since the last syncJob
        jobFailed = true
        failureReason = "BackoffLimitExceeded"
        failureMessage = "Job has reached the specified backoff limit"
    } else if pastActiveDeadline(&job) {
        jobFailed = true
        failureReason = "DeadlineExceeded"
        failureMessage = "Job was active longer than specified deadline"
    }

    if jobFailed {
        errCh := make(chan error, active)
        jm.deleteJobPods(&job, activePods, errCh)
        select {
        case manageJobErr = <-errCh:
            if manageJobErr != nil {
                break
            }
        default:
        }

        // update status values accordingly
        failed += active
        active = 0
        job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
        jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
    } else {
        if jobNeedsSync && job.DeletionTimestamp == nil {
            active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
        }
        completions := succeeded
        complete := false
        if job.Spec.Completions == nil {
            // This type of job is complete when any pod exits with success.
            // Each pod is capable of
            // determining whether or not the entire Job is done.  Subsequent pods are
            // not expected to fail, but if they do, the failure is ignored.  Once any
            // pod succeeds, the controller waits for remaining pods to finish, and
            // then the job is complete.
            if succeeded > 0 && active == 0 {
                complete = true
            }
        } else {
            // Job specifies a number of completions.  This type of job signals
            // success by having that number of successes.  Since we do not
            // start more pods than there are remaining completions, there should
            // not be any remaining active pods once this count is reached.
            if completions >= *job.Spec.Completions {
                complete = true
                if active > 0 {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
                }
                if completions > *job.Spec.Completions {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
                }
            }
        }
        if complete {
            job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
            now := metav1.Now()
            job.Status.CompletionTime = &now
            jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")
        }
    }

    forget := false
    // Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true
    // This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to
    // improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed.
    // In this case, we should clear the backoff delay.
    if job.Status.Succeeded < succeeded {
        forget = true
    }

    // no need to update the job if the status hasn't changed since last time
    if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
        job.Status.Active = active
        job.Status.Succeeded = succeeded
        job.Status.Failed = failed

        if err := jm.updateHandler(&job); err != nil {
            return forget, err
        }

        if jobHaveNewFailure && !IsJobFinished(&job) {
            // returning an error will re-enqueue Job after the backoff period
            return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
        }

        forget = true
    }

    return forget, manageJobErr
}

func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
    var activeLock sync.Mutex
    active := int32(len(activePods))
    parallelism := *job.Spec.Parallelism
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
        return 0, nil
    }

    var errCh chan error
    if active > parallelism {
        diff := active - parallelism
        errCh = make(chan error, diff)
        jm.expectations.ExpectDeletions(jobKey, int(diff))
        klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
        // Sort the pods in the order such that not-ready < ready, unscheduled
        // < scheduled, and pending < running. This ensures that we delete pods
        // in the earlier stages whenever possible.
        sort.Sort(controller.ActivePods(activePods))

        active -= diff
        wait := sync.WaitGroup{}
        wait.Add(int(diff))
        for i := int32(0); i < diff; i++ {
            go func(ix int32) {
                defer wait.Done()
                if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    jm.expectations.DeletionObserved(jobKey)
                    if !apierrors.IsNotFound(err) {
                        klog.V(2).Infof("Failed to delete %v, decremented expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
                        activeLock.Lock()
                        active++
                        activeLock.Unlock()
                        errCh <- err
                        utilruntime.HandleError(err)
                    }

                }
            }(i)
        }
        wait.Wait()

    } else if active < parallelism {
        wantActive := int32(0)
        if job.Spec.Completions == nil {
            // Job does not specify a number of completions.  Therefore, number active
            // should be equal to parallelism, unless the job has seen at least
            // once success, in which leave whatever is running, running.
            if succeeded > 0 {
                wantActive = active
            } else {
                wantActive = parallelism
            }
        } else {
            // Job specifies a specific number of completions.  Therefore, number
            // active should not ever exceed number of remaining completions.
            wantActive = *job.Spec.Completions - succeeded
            if wantActive > parallelism {
                wantActive = parallelism
            }
        }
        diff := wantActive - active
        if diff < 0 {
            utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
            diff = 0
        }
        if diff == 0 {
            return active, nil
        }
        jm.expectations.ExpectCreations(jobKey, int(diff))
        errCh = make(chan error, diff)
        klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

        active += diff
        wait := sync.WaitGroup{}

        // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
        // and double with each successful iteration in a kind of "slow start".
        // This handles attempts to start large numbers of pods that would
        // likely all fail with the same error. For example a project with a
        // low quota that attempts to create a large number of pods will be
        // prevented from spamming the API service with the pod create requests
        // after one of its pods fails.  Conveniently, this also prevents the
        // event spam that those failures would generate.
        for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
            errorCount := len(errCh)
            wait.Add(int(batchSize))
            for i := int32(0); i < batchSize; i++ {
                go func() {
                    defer wait.Done()
                    err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
                    if err != nil {
                        if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                            // If the namespace is being torn down, we can safely ignore
                            // this error since all subsequent creations will fail.
                            return
                        }
                    }
                    if err != nil {
                        defer utilruntime.HandleError(err)
                        // Decrement the expected number of creates because the informer won't observe this pod
                        klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
                        jm.expectations.CreationObserved(jobKey)
                        activeLock.Lock()
                        active--
                        activeLock.Unlock()
                        errCh <- err
                    }
                }()
            }
            wait.Wait()
            // any skipped pods that we never attempted to start shouldn't be expected.
            skippedPods := diff - batchSize
            if errorCount < len(errCh) && skippedPods > 0 {
                klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
                active -= skippedPods
                for i := int32(0); i < skippedPods; i++ {
                    // Decrement the expected number of creates because the informer won't observe this pod
                    jm.expectations.CreationObserved(jobKey)
                }
                // The skipped pods will be retried later. The next controller resync will
                // retry the slow start process.
                break
            }
            diff -= batchSize
        }
    }

    select {
    case err := <-errCh:
        // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
        if err != nil {
            return active, err
        }
    default:
    }

    return active, nil
}

相关文章

网友评论

      本文标题:k8s 之 job controller 源码简单分析

      本文链接:https://www.haomeiwen.com/subject/aprykktx.html