美文网首页
Kubelet创建POD的流程笔记

Kubelet创建POD的流程笔记

作者: Teddy_b | 来源:发表于2023-07-30 15:25 被阅读0次

    背景

    上次我们简单看了下kubelet作为客户端、服务端的认证和授权方式(https://www.jianshu.com/p/cb203dbc3dd0

    这次我们来看下kubelet的主要工作之一:创建POD

    kubelet启动

    kubelet的syncLoop

    kubelet再启动的时候用于处理pod流程的是syncLoop,在syncLoop中通过select监听着多种信号,其中一种就是处于pod的增删改查的

    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        select {
        case u, open := <-configCh:
            ...
    
            switch u.Op {
            case kubetypes.ADD:
                
                handler.HandlePodAdditions(u.Pods)
            case kubetypes.UPDATE:
                handler.HandlePodUpdates(u.Pods)
            case kubetypes.REMOVE:
                handler.HandlePodRemoves(u.Pods)
            case kubetypes.RECONCILE:
                handler.HandlePodReconcile(u.Pods)
            case kubetypes.DELETE:
                handler.HandlePodUpdates(u.Pods)
            ...
    
            kl.sourcesReady.AddSource(u.Source)
    
        case e := <-plegCh:
            ...
        case <-syncCh:
            ...
        case update := <-kl.livenessManager.Updates():
            ...
        case <-housekeepingCh:
            ...
        return true
    }
    

    u, open := <-configCh监听通道中的pod是怎么发送过来的呢?

    POD从哪里来

    在实际使用中,我们已经知道POD是通过Deployment -> ReplicaSet ->Pod 这么个过程产生的,所以我们直接看下ReplicaSet的代码看下他是怎么提交POD到apiserver的

    具体的代码在kube-controller-manager中的ReplicaSetController中,不是本文的重点,因此只是简略看下

    func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
          // 查出这个命名空间下的所有POD
        allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
        
            // 找出这个ReplicaSet的POD
        filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
        
              // 进行POD的副本调整
        if rsNeedsSync && rs.DeletionTimestamp == nil {
            manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
        }
        ...
    }
    
    func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
        diff := len(filteredPods) - int(*(rs.Spec.Replicas))
        ...
        if diff < 0 {
            // diff小于0  说明POD数量少了,需要创建POD
            successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
                err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
                ...
                return err
            })
                    ...
        } else if diff > 0 {
            // diff大于0, 说明POD数量多了,需要删除POD
        }
    
        return nil
    }
    
    func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
            // ReplicaSet中提取出POD的模板
            // 然后把ReplicaSet中的selector设置到POD的labels里面
            // 再把ReplicaSet设置为POD的ownerReference
        pod, err := GetPodFromTemplate(template, object, controllerRef)
        ...
            // 创建POD
        newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
        ...
    }
    

    总结一下:

    • ReplicaSet提交POD的流程还算简单,只需要对比期望副本数和实际副本数,来调整POD数量即可
    • 创建POD的时候,从ReplicaSet中提取出POD的模板,然后把ReplicaSet中的selector设置到POD的labels里面,再把ReplicaSet设置为POD的ownerReference;然后就可以提交给Apiserver去创建了
    kubelet接收POD

    上述ReplicaSet创建了一个POD后,kubelet是怎么检测到的呢?

    func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
        lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
      
           newSourceApiserverFromLW(lw, updates)
    }
    
    func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
        send := func(objs []interface{}) {
            var pods []*v1.Pod
            for _, o := range objs {
                pods = append(pods, o.(*v1.Pod))
            }
            updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
        }
        r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
        go r.Run(wait.NeverStop)
    }
    

    可以看到,kubelet实际watch的是spec.nodeName=当前节点的那些POD,spec.nodeName的值是kube-scheduler设置上去的;

    与普通的watch不同,kubelet watch到pod事件后会把当前watch到的所有POD都发送到一个updates通道里

    为了方便理解,假设kubelet是新启的,还没有任何pod,现在kubelet通过watch机制收到了一个POD ADD事件,那么就会把所有的pod都发送到updates通道里,由于目前只有一个pod,所以也只会发送一个pod到通道里

    那通道收到这个POD ADD后会怎么处理呢?

    func (m *Mux) Channel(source string) chan interface{} {
        ...
        m.sources[source] = newChannel
        go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
        return newChannel
    }
    
    func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
        for update := range listenChannel {
            m.merger.Merge(source, update)
        }
    }
    
    func (s *podStorage) Merge(source string, change interface{}) error {
        // 收到的所有POD先和kubelet本地缓存的POD进行合并
            // 区分出哪些是新增的、哪些是更新了的、哪些是删除了的、哪些是status变化了只需要reconcile即可
            adds, updates, deletes, removes, reconciles := s.merge(source, change)
        
        switch s.mode {
            // 默认是这个模式,会把所有的POD都发往kubelet主循环里监听的通道
        case PodConfigNotificationIncremental:
            if len(removes.Pods) > 0 {
                s.updates <- *removes
            }
            if len(adds.Pods) > 0 {
                s.updates <- *adds
            }
            if len(updates.Pods) > 0 {
                s.updates <- *updates
            }
            if len(deletes.Pods) > 0 {
                s.updates <- *deletes
            }
            if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
                // Send an empty update when first seeing the source and there are
                // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
                // the source is ready.
                s.updates <- *adds
            }
            // Only add reconcile support here, because kubelet doesn't support Snapshot update now.
            if len(reconciles.Pods) > 0 {
                s.updates <- *reconciles
            }
    }
    
    func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
        ...
        updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
            filtered := filterInvalidPods(newPods, source, s.recorder)
            for _, ref := range filtered {
                ...
                if existing, found := oldPods[ref.UID]; found {
                    pods[ref.UID] = existing
                    needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
                    if needUpdate {
                        updatePods = append(updatePods, existing)
                    } else if needReconcile {
                        reconcilePods = append(reconcilePods, existing)
                    } else if needGracefulDelete {
                        deletePods = append(deletePods, existing)
                    }
                    continue
                }
                recordFirstSeenTime(ref)
                pods[ref.UID] = ref
                addPods = append(addPods, ref)
            }
        }
    
        switch update.Op {
        case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
            ...
    
        case kubetypes.REMOVE:
            ...
             // 再前面的watch里面,发送的是SET事件
        case kubetypes.SET:
            klog.V(4).Infof("Setting pods for source %s", source)
            s.markSourceSet(source)
            // Clear the old map entries by just creating a new map
            oldPods := pods
            pods = make(map[types.UID]*v1.Pod)
            updatePodsFunc(update.Pods, oldPods, pods)
            for uid, existing := range oldPods {
                if _, found := pods[uid]; !found {
                    // this is a delete
                    removePods = append(removePods, existing)
                }
            }
                    ...
        }
    
        s.pods[source] = pods
    
        adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
        updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
        deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
        removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
        reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
    
        return adds, updates, deletes, removes, reconciles
    }
    

    可以看到PodConfig中会为每个source维护一个通道,目前只有一个source=api,然后再协程中一直监听这个通道,这个通道对应的就是watch后发送所有pod的通道;

    通道收到数据后,就会和PodConfig中缓存的pod进行merge,区分出哪些是新增的、哪些是更新了的、哪些是删除了的、哪些是status变化了只需要reconcile即可

    • 缓存中已经存在了的,需要进一步区分是update还是reconcile还是delete;
      • 如果pod模板没变化,但是status变化了,那么是reconcile
      • 如果pod模板变了,但是删除时间不为空,那么是delete
      • 如果pod模板变了,删除时间也为空,那么是update
    • 缓存中不存在的,是add
    • 缓存中多出的,是remove

    所有这些merge完成后的pod和对应的事件都会发送到kubelet的主循环监听的通道里

    梳理一下整个流程:

    • kubelet通过watch一直监听spec.nodeName=当前节点的所有POD事件

    • 收到POD的某个事件后就会将所有watch到的POD(相当于一次list)发送到一个缓存的通道里

    • 通道收到这全部的POD后,就和和kubelet本地缓存的所有POD进行merge,进而对POD进行分类,ADD、DELETE、UPDATE、RECONCILE和REMOVE

    • 然后再将这些分类后的POD发送到kubelet的syncLoop里,kubelet会对不同操作类型的POD进行不同的操作

    创建POD

    kubelet收到ADD事件后,会执行对应的创建操作

    分发POD
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
        // 所有POD按照创建时间排序
        sort.Sort(sliceutils.PodsByCreationTime(pods))
    
            // 这是当前需要创建的所有POD
        for _, pod := range pods {
            ...
                    // 忽略一些静态POD的处理逻辑
    
                    // 分发这个POD,这里的类型是SyncPodCreate
            kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
            
                    // 
            kl.probeManager.AddPod(pod)
        }
    }
    
    接收POD
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
        ...
    
        // 添加这个POD,上一步指定了这里的类型是SyncPodCreate
        kl.podWorkers.UpdatePod(&UpdatePodOptions{
            Pod:        pod,
            MirrorPod:  mirrorPod,
            UpdateType: syncType,
            OnCompleteFunc: ...,
        })
        ...
    }
    
    func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
        ...
            // 检查是否已经处理过这个POD(UID)
        if podUpdates, exists = p.podUpdates[uid]; !exists {
            // 还没处理过的,记录已经处理过这个POD了,注意这里是大小为1的阻塞通道
            podUpdates = make(chan UpdatePodOptions, 1)
            p.podUpdates[uid] = podUpdates
    
            // 启动一个协程来处理这个POD的事件
            go func() {
                defer runtime.HandleCrash()
                p.managePodLoop(podUpdates)
            }()
        }
    
           // ADD的时候会执行if中的逻辑,然后标记这个POD再处理中了,然后发送一个消息到上面的阻塞通道里
        if !p.isWorking[pod.UID] {
            p.isWorking[pod.UID] = true
            podUpdates <- *options
        } else {
            // if a request to kill a pod is pending, we do not let anything overwrite that request.
            update, found := p.lastUndeliveredWorkUpdate[pod.UID]
            if !found || update.UpdateType != kubetypes.SyncPodKill {
                p.lastUndeliveredWorkUpdate[pod.UID] = *options
            }
        }
    }
    
    开始干活

    真正干活的是上一步中启动的异步协程

    func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
        var lastSyncTime time.Time
            // 大小为1的通道上阻塞的读取,上一步中执行ADD逻辑的时候会发送一个消息到这个通道里
        for update := range podUpdates {
            err := func() error {
                // 从缓存中获取这个POD的状态
                            // 这是一个阻塞操作,会一直等待直到缓存中存在当前POD的状态
                status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
                ...
                            // 执行主要的syncPod逻辑
                err = p.syncPodFn(syncPodOptions{
                    mirrorPod:      update.MirrorPod,
                    pod:            update.Pod,
                    podStatus:      status,
                    killPodOptions: update.KillPodOptions,
                    updateType:     update.UpdateType,
                })
                lastSyncTime = time.Now()
                return err
            }()
            ...
            p.wrapUp(update.Pod.UID, err)
        }
    }
    

    上面的逻辑里主要包括两步,需要分别看下

    第一步是:从缓存中阻塞获取POD的状态,那么POD的状态是何时丢到缓存中去的呢?

    这就要回到kubelet的syncLoop中去了,再syncLoop中除了会监听POD的事件之外,还会监听PLEG信号(Pod Lifecycle Event Generate),即POD的生命历程里的各种事件生成信号,那他为什么和唤醒上面的阻塞读有关呢?

    // kubelet启动的时候会有这么一步
    kl.pleg.Start()
    
    // 这个启动了一个异步协程每秒执行一次relist函数
    func (g *GenericPLEG) Start() {
        go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
    }
    
    // relist的执行逻辑
    func (g *GenericPLEG) relist() {
        // 调用CRI接口获取所有k8s容器,包括sandbox容器
            // 因为k8s管理的容器会打一些特殊的标签,因此是可以区分开的
        podList, err := g.runtime.GetPods(true)
        
           // 生成事件
        for pid := range g.podRecords {
            oldPod := g.podRecords.getOld(pid)
            pod := g.podRecords.getCurrent(pid)
            // Get all containers in the old and the new pod.
            allContainers := getContainersFromPods(oldPod, pod)
            for _, container := range allContainers {
                events := computeEvents(oldPod, pod, &container.ID)
                for _, e := range events {
                    updateEvents(eventsByPodID, e)
                }
            }
        }
    
        // 发送事件到PLEG通道,由kubelet捕获
        for pid, events := range eventsByPodID {
            ...
        }
    
           // 默认就是启用了缓存的
        if g.cacheEnabled() {
            // 更新缓存里的全局时间
            g.cache.UpdateTime(timestamp)
        }
    }
    

    初次ADD POD的时候,虽然CRI上什么容器都查不到,但是只要启用了缓存,就会对缓存的全局时间进行更新,这很重要

    func (c *cache) UpdateTime(timestamp time.Time) {
        ...
        c.timestamp = &timestamp
        // Notify all the subscribers if the condition is met.
        for id := range c.subscribers {
            c.notify(id, *c.timestamp)
        }
    }
    
    func (c *cache) notify(id types.UID, timestamp time.Time) {
        ...
        for i, r := range list {
            ...
            r.ch <- c.get(id)
            close(r.ch)
        }
        if len(newList) == 0 {
            delete(c.subscribers, id)
        } else {
            c.subscribers[id] = newList
        }
    }
    
    func (c *cache) get(id types.UID) *data {
        d, ok := c.pods[id]
        if !ok {
            
            return &data{status: &PodStatus{ID: id}, err: nil}
        }
        return d
    }
    

    上面更新了缓存的全局时间后,这列会对阻塞再缓存读取上的订阅者进行唤醒,唤醒的时候如果还没有这个POD的status数据,就会返回一个默认的只带UID的状态数据,然后从订阅者中删除这个UID的POD

    因此pod worker才能从缓存阻塞获取状态的请求中被唤醒,然后开始执行 syncPod的主要逻辑

    第二步 syncPod的逻辑主要就是创建容器相关的了

    func (kl *Kubelet) syncPod(o syncPodOptions) error {
        // 为这个POD生成status部分数据
        apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
        
             // 检测这个POD能否再当前节点运行
        runnable := kl.canRunPod(pod)
    
        // 更新这个POD的status数据
        kl.statusManager.SetPodStatus(pod, apiPodStatus)
    
            // 维护这个POD的Cgroup路径
        pcm := kl.containerManager.NewPodContainerManager()
        
        // 创建这个POD的数据目录 /var/lib/kubelet/uid/volumes、/var/lib/kubelet/uid/plugins
        if err := kl.makePodDataDirs(pod)
    
        // Volume manager will not mount volumes for terminated pods
        if !kl.podIsTerminated(pod) {
            // 等待volume挂载完成
            if err := kl.volumeManager.WaitForAttachAndMount(pod); 
        }
    
        // 获取镜像拉取Secret
        pullSecrets := kl.getPullSecretsForPod(pod)
    
        // 调用CRI创建容器
        result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    
    }
    

    相关文章

      网友评论

          本文标题:Kubelet创建POD的流程笔记

          本文链接:https://www.haomeiwen.com/subject/ofblpdtx.html