美文网首页
深入分析K8S Jobs

深入分析K8S Jobs

作者: 陈先生_9e91 | 来源:发表于2018-09-19 15:30 被阅读0次

    深入分析K8S Jobs

    Jobs常用于Batch计算任务,即容器会运行结束。如果容器正常退出,即exit-code == 0,那么Jobs就是Completed。

    实现猜想

    Job controller需要list-watch Job & Pod,发现新的Job创建,就创建对应的Pod;发现Pod失败,就继续创建Pod,直到失败次数达到backoffLimit, Specifies the number of retries before marking this job failed. Defaults to 6,将Job标记为failed。

    code

    K8S代码非常的Go语言,充斥大量的生成者消费者模型,各种goroutine & channel。不熟悉的话,看起来会比较吃力,可以先看郝大的《Go语言并发编程实战》。

    k8s.io\kubernetes\pkg\controller\job\job_controller.go

    // 入口
    func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {
        // Informer之前在list-watch有过相关介绍
        // 这里JobController对Job & Pod进行list-watch,符合之前猜想
        jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                jm.enqueueController(obj, true)
            },
            UpdateFunc: jm.updateJob,
            DeleteFunc: func(obj interface{}) {
                jm.enqueueController(obj, true)
            },
        })
        
        podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    jm.addPod,
            UpdateFunc: jm.updatePod,
            DeleteFunc: jm.deletePod,
        })
    }
    
    // 将Added ev的add workqueue,典型生产者消费者
    func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
        key, err := controller.KeyFunc(obj)
        jm.queue.AddAfter(key, backoff)
    }
    
    // queue消费者
    func (jm *JobController) processNextWorkItem() bool {
        key, quit := jm.queue.Get() 
        
        forget, err := jm.syncHandler(key.(string))
    }
    
    // 同步Job,使Job状态达到预期
    func (jm *JobController) syncJob(key string) (bool, error) {
        ns, name, err := cache.SplitMetaNamespaceKey(key)
        sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
        job := *sharedJob
    
        // if job was finished previously, we don't want to redo the termination
        if IsJobFinished(&job) {
            return true, nil
        }
        
        pods, err := jm.getPodsForJob(&job)
        
        activePods := controller.FilterActivePods(pods)
        active := int32(len(activePods))
        succeeded, failed := getStatus(pods)
        
        if jobFailed {
            jm.deleteJobPods(&job, activePods, errCh)
        } else {
            active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
        }
    }
    
    
    // 管理Job的Pod数量
    func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
        active := int32(len(activePods))
        parallelism := *job.Spec.Parallelism
        
        if active > parallelism {
            diff := active - parallelism
            
            // "Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff
            for i := int32(0); i < diff; i++ {
                go func(ix int32) {
                    jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); 
                }(i)
            }
        } else if active < parallelism {
            diff := wantActive - active
            
            // "Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff
            // batch create Pod。
            // 类似TCP的拥塞控制, 每次创建1,2,4....
            for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
                for i := int32(0); i < batchSize; i++ {
                    go func() {
                        jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
                    }()
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:深入分析K8S Jobs

          本文链接:https://www.haomeiwen.com/subject/eayinftx.html