美文网首页
Kubelet创建POD的流程笔记

Kubelet创建POD的流程笔记

作者: Teddy_b | 来源:发表于2023-07-30 15:25 被阅读0次

背景

上次我们简单看了下kubelet作为客户端、服务端的认证和授权方式(https://www.jianshu.com/p/cb203dbc3dd0

这次我们来看下kubelet的主要工作之一:创建POD

kubelet启动

kubelet的syncLoop

kubelet再启动的时候用于处理pod流程的是syncLoop,在syncLoop中通过select监听着多种信号,其中一种就是处于pod的增删改查的

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        ...

        switch u.Op {
        case kubetypes.ADD:
            
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            handler.HandlePodUpdates(u.Pods)
        ...

        kl.sourcesReady.AddSource(u.Source)

    case e := <-plegCh:
        ...
    case <-syncCh:
        ...
    case update := <-kl.livenessManager.Updates():
        ...
    case <-housekeepingCh:
        ...
    return true
}

u, open := <-configCh监听通道中的pod是怎么发送过来的呢?

POD从哪里来

在实际使用中,我们已经知道POD是通过Deployment -> ReplicaSet ->Pod 这么个过程产生的,所以我们直接看下ReplicaSet的代码看下他是怎么提交POD到apiserver的

具体的代码在kube-controller-manager中的ReplicaSetController中,不是本文的重点,因此只是简略看下

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
      // 查出这个命名空间下的所有POD
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    
        // 找出这个ReplicaSet的POD
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    
          // 进行POD的副本调整
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    ...
}

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    ...
    if diff < 0 {
        // diff小于0  说明POD数量少了,需要创建POD
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            ...
            return err
        })
                ...
    } else if diff > 0 {
        // diff大于0, 说明POD数量多了,需要删除POD
    }

    return nil
}

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
        // ReplicaSet中提取出POD的模板
        // 然后把ReplicaSet中的selector设置到POD的labels里面
        // 再把ReplicaSet设置为POD的ownerReference
    pod, err := GetPodFromTemplate(template, object, controllerRef)
    ...
        // 创建POD
    newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
    ...
}

总结一下:

  • ReplicaSet提交POD的流程还算简单,只需要对比期望副本数和实际副本数,来调整POD数量即可
  • 创建POD的时候,从ReplicaSet中提取出POD的模板,然后把ReplicaSet中的selector设置到POD的labels里面,再把ReplicaSet设置为POD的ownerReference;然后就可以提交给Apiserver去创建了
kubelet接收POD

上述ReplicaSet创建了一个POD后,kubelet是怎么检测到的呢?

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
  
       newSourceApiserverFromLW(lw, updates)
}

func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
    send := func(objs []interface{}) {
        var pods []*v1.Pod
        for _, o := range objs {
            pods = append(pods, o.(*v1.Pod))
        }
        updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
    }
    r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
    go r.Run(wait.NeverStop)
}

可以看到,kubelet实际watch的是spec.nodeName=当前节点的那些POD,spec.nodeName的值是kube-scheduler设置上去的;

与普通的watch不同,kubelet watch到pod事件后会把当前watch到的所有POD都发送到一个updates通道里

为了方便理解,假设kubelet是新启的,还没有任何pod,现在kubelet通过watch机制收到了一个POD ADD事件,那么就会把所有的pod都发送到updates通道里,由于目前只有一个pod,所以也只会发送一个pod到通道里

那通道收到这个POD ADD后会怎么处理呢?

func (m *Mux) Channel(source string) chan interface{} {
    ...
    m.sources[source] = newChannel
    go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
    return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
    for update := range listenChannel {
        m.merger.Merge(source, update)
    }
}

func (s *podStorage) Merge(source string, change interface{}) error {
    // 收到的所有POD先和kubelet本地缓存的POD进行合并
        // 区分出哪些是新增的、哪些是更新了的、哪些是删除了的、哪些是status变化了只需要reconcile即可
        adds, updates, deletes, removes, reconciles := s.merge(source, change)
    
    switch s.mode {
        // 默认是这个模式,会把所有的POD都发往kubelet主循环里监听的通道
    case PodConfigNotificationIncremental:
        if len(removes.Pods) > 0 {
            s.updates <- *removes
        }
        if len(adds.Pods) > 0 {
            s.updates <- *adds
        }
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        }
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        }
        if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
            // Send an empty update when first seeing the source and there are
            // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
            // the source is ready.
            s.updates <- *adds
        }
        // Only add reconcile support here, because kubelet doesn't support Snapshot update now.
        if len(reconciles.Pods) > 0 {
            s.updates <- *reconciles
        }
}

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
    ...
    updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
        filtered := filterInvalidPods(newPods, source, s.recorder)
        for _, ref := range filtered {
            ...
            if existing, found := oldPods[ref.UID]; found {
                pods[ref.UID] = existing
                needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
                if needUpdate {
                    updatePods = append(updatePods, existing)
                } else if needReconcile {
                    reconcilePods = append(reconcilePods, existing)
                } else if needGracefulDelete {
                    deletePods = append(deletePods, existing)
                }
                continue
            }
            recordFirstSeenTime(ref)
            pods[ref.UID] = ref
            addPods = append(addPods, ref)
        }
    }

    switch update.Op {
    case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
        ...

    case kubetypes.REMOVE:
        ...
         // 再前面的watch里面,发送的是SET事件
    case kubetypes.SET:
        klog.V(4).Infof("Setting pods for source %s", source)
        s.markSourceSet(source)
        // Clear the old map entries by just creating a new map
        oldPods := pods
        pods = make(map[types.UID]*v1.Pod)
        updatePodsFunc(update.Pods, oldPods, pods)
        for uid, existing := range oldPods {
            if _, found := pods[uid]; !found {
                // this is a delete
                removePods = append(removePods, existing)
            }
        }
                ...
    }

    s.pods[source] = pods

    adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
    updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
    deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
    removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
    reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

    return adds, updates, deletes, removes, reconciles
}

可以看到PodConfig中会为每个source维护一个通道,目前只有一个source=api,然后再协程中一直监听这个通道,这个通道对应的就是watch后发送所有pod的通道;

通道收到数据后,就会和PodConfig中缓存的pod进行merge,区分出哪些是新增的、哪些是更新了的、哪些是删除了的、哪些是status变化了只需要reconcile即可

  • 缓存中已经存在了的,需要进一步区分是update还是reconcile还是delete;
    • 如果pod模板没变化,但是status变化了,那么是reconcile
    • 如果pod模板变了,但是删除时间不为空,那么是delete
    • 如果pod模板变了,删除时间也为空,那么是update
  • 缓存中不存在的,是add
  • 缓存中多出的,是remove

所有这些merge完成后的pod和对应的事件都会发送到kubelet的主循环监听的通道里

梳理一下整个流程:

  • kubelet通过watch一直监听spec.nodeName=当前节点的所有POD事件

  • 收到POD的某个事件后就会将所有watch到的POD(相当于一次list)发送到一个缓存的通道里

  • 通道收到这全部的POD后,就和和kubelet本地缓存的所有POD进行merge,进而对POD进行分类,ADD、DELETE、UPDATE、RECONCILE和REMOVE

  • 然后再将这些分类后的POD发送到kubelet的syncLoop里,kubelet会对不同操作类型的POD进行不同的操作

创建POD

kubelet收到ADD事件后,会执行对应的创建操作

分发POD
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    // 所有POD按照创建时间排序
    sort.Sort(sliceutils.PodsByCreationTime(pods))

        // 这是当前需要创建的所有POD
    for _, pod := range pods {
        ...
                // 忽略一些静态POD的处理逻辑

                // 分发这个POD,这里的类型是SyncPodCreate
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        
                // 
        kl.probeManager.AddPod(pod)
    }
}
接收POD
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    ...

    // 添加这个POD,上一步指定了这里的类型是SyncPodCreate
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: ...,
    })
    ...
}

func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    ...
        // 检查是否已经处理过这个POD(UID)
    if podUpdates, exists = p.podUpdates[uid]; !exists {
        // 还没处理过的,记录已经处理过这个POD了,注意这里是大小为1的阻塞通道
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

        // 启动一个协程来处理这个POD的事件
        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }

       // ADD的时候会执行if中的逻辑,然后标记这个POD再处理中了,然后发送一个消息到上面的阻塞通道里
    if !p.isWorking[pod.UID] {
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } else {
        // if a request to kill a pod is pending, we do not let anything overwrite that request.
        update, found := p.lastUndeliveredWorkUpdate[pod.UID]
        if !found || update.UpdateType != kubetypes.SyncPodKill {
            p.lastUndeliveredWorkUpdate[pod.UID] = *options
        }
    }
}
开始干活

真正干活的是上一步中启动的异步协程

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    var lastSyncTime time.Time
        // 大小为1的通道上阻塞的读取,上一步中执行ADD逻辑的时候会发送一个消息到这个通道里
    for update := range podUpdates {
        err := func() error {
            // 从缓存中获取这个POD的状态
                        // 这是一个阻塞操作,会一直等待直到缓存中存在当前POD的状态
            status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
            ...
                        // 执行主要的syncPod逻辑
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            lastSyncTime = time.Now()
            return err
        }()
        ...
        p.wrapUp(update.Pod.UID, err)
    }
}

上面的逻辑里主要包括两步,需要分别看下

第一步是:从缓存中阻塞获取POD的状态,那么POD的状态是何时丢到缓存中去的呢?

这就要回到kubelet的syncLoop中去了,再syncLoop中除了会监听POD的事件之外,还会监听PLEG信号(Pod Lifecycle Event Generate),即POD的生命历程里的各种事件生成信号,那他为什么和唤醒上面的阻塞读有关呢?

// kubelet启动的时候会有这么一步
kl.pleg.Start()

// 这个启动了一个异步协程每秒执行一次relist函数
func (g *GenericPLEG) Start() {
    go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

// relist的执行逻辑
func (g *GenericPLEG) relist() {
    // 调用CRI接口获取所有k8s容器,包括sandbox容器
        // 因为k8s管理的容器会打一些特殊的标签,因此是可以区分开的
    podList, err := g.runtime.GetPods(true)
    
       // 生成事件
    for pid := range g.podRecords {
        oldPod := g.podRecords.getOld(pid)
        pod := g.podRecords.getCurrent(pid)
        // Get all containers in the old and the new pod.
        allContainers := getContainersFromPods(oldPod, pod)
        for _, container := range allContainers {
            events := computeEvents(oldPod, pod, &container.ID)
            for _, e := range events {
                updateEvents(eventsByPodID, e)
            }
        }
    }

    // 发送事件到PLEG通道,由kubelet捕获
    for pid, events := range eventsByPodID {
        ...
    }

       // 默认就是启用了缓存的
    if g.cacheEnabled() {
        // 更新缓存里的全局时间
        g.cache.UpdateTime(timestamp)
    }
}

初次ADD POD的时候,虽然CRI上什么容器都查不到,但是只要启用了缓存,就会对缓存的全局时间进行更新,这很重要

func (c *cache) UpdateTime(timestamp time.Time) {
    ...
    c.timestamp = &timestamp
    // Notify all the subscribers if the condition is met.
    for id := range c.subscribers {
        c.notify(id, *c.timestamp)
    }
}

func (c *cache) notify(id types.UID, timestamp time.Time) {
    ...
    for i, r := range list {
        ...
        r.ch <- c.get(id)
        close(r.ch)
    }
    if len(newList) == 0 {
        delete(c.subscribers, id)
    } else {
        c.subscribers[id] = newList
    }
}

func (c *cache) get(id types.UID) *data {
    d, ok := c.pods[id]
    if !ok {
        
        return &data{status: &PodStatus{ID: id}, err: nil}
    }
    return d
}

上面更新了缓存的全局时间后,这列会对阻塞再缓存读取上的订阅者进行唤醒,唤醒的时候如果还没有这个POD的status数据,就会返回一个默认的只带UID的状态数据,然后从订阅者中删除这个UID的POD

因此pod worker才能从缓存阻塞获取状态的请求中被唤醒,然后开始执行 syncPod的主要逻辑

第二步 syncPod的逻辑主要就是创建容器相关的了

func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // 为这个POD生成status部分数据
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    
         // 检测这个POD能否再当前节点运行
    runnable := kl.canRunPod(pod)

    // 更新这个POD的status数据
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

        // 维护这个POD的Cgroup路径
    pcm := kl.containerManager.NewPodContainerManager()
    
    // 创建这个POD的数据目录 /var/lib/kubelet/uid/volumes、/var/lib/kubelet/uid/plugins
    if err := kl.makePodDataDirs(pod)

    // Volume manager will not mount volumes for terminated pods
    if !kl.podIsTerminated(pod) {
        // 等待volume挂载完成
        if err := kl.volumeManager.WaitForAttachAndMount(pod); 
    }

    // 获取镜像拉取Secret
    pullSecrets := kl.getPullSecretsForPod(pod)

    // 调用CRI创建容器
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)

}

相关文章

网友评论

      本文标题:Kubelet创建POD的流程笔记

      本文链接:https://www.haomeiwen.com/subject/ofblpdtx.html