美文网首页
深入分析kubelet(2)——创建Pod

深入分析kubelet(2)——创建Pod

作者: 陈先生_9e91 | 来源:发表于2018-09-29 11:44 被阅读0次

    深入分析kubelet(2)——创建Pod

    紧接着上一篇继续学习。上一篇讲到生产者,本篇将介绍消费者。

    background

    PodManager

    k8s.io\kubernetes\pkg\kubelet\pod\pod_manager.go

    // Manager stores and manages access to pods, maintaining the mappings
    // between static pods and mirror pods.
    //
    // The kubelet discovers pod updates from 3 sources: file, http, and
    // apiserver. Pods from non-apiserver sources are called static pods, and API
    // server is not aware of the existence of static pods. In order to monitor
    // the status of such pods, the kubelet creates a mirror pod for each static
    // pod via the API server.
    //
    // A mirror pod has the same pod full name (name and namespace) as its static
    // counterpart (albeit different metadata such as UID, etc). By leveraging the
    // fact that the kubelet reports the pod status using the pod full name, the
    // status of the mirror pod always reflects the actual status of the static
    // pod. When a static pod gets deleted, the associated orphaned mirror pod
    // will also be removed.
    type Manager interface {
        // GetPods returns the regular pods bound to the kubelet and their spec.
        GetPods() []*v1.Pod
        // GetPodByFullName returns the (non-mirror) pod that matches full name, as well as
        // whether the pod was found.
        GetPodByFullName(podFullName string) (*v1.Pod, bool)
        // GetPodByName provides the (non-mirror) pod that matches namespace and
        // name, as well as whether the pod was found.
        GetPodByName(namespace, name string) (*v1.Pod, bool)
        // GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as
        // whether the pod is found.
        GetPodByUID(types.UID) (*v1.Pod, bool)
        // GetPodByMirrorPod returns the static pod for the given mirror pod and
        // whether it was known to the pod manger.
        GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
        // GetMirrorPodByPod returns the mirror pod for the given static pod and
        // whether it was known to the pod manager.
        GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
        // GetPodsAndMirrorPods returns the both regular and mirror pods.
        GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
        // SetPods replaces the internal pods with the new pods.
        // It is currently only used for testing.
        SetPods(pods []*v1.Pod)
        // AddPod adds the given pod to the manager.
        AddPod(pod *v1.Pod)
        // UpdatePod updates the given pod in the manager.
        UpdatePod(pod *v1.Pod)
        // DeletePod deletes the given pod from the manager.  For mirror pods,
        // this means deleting the mappings related to mirror pods.  For non-
        // mirror pods, this means deleting from indexes for all non-mirror pods.
        DeletePod(pod *v1.Pod)
        // DeleteOrphanedMirrorPods deletes all mirror pods which do not have
        // associated static pods. This method sends deletion requests to the API
        // server, but does NOT modify the internal pod storage in basicManager.
        DeleteOrphanedMirrorPods()
        // TranslatePodUID returns the actual UID of a pod. If the UID belongs to
        // a mirror pod, returns the UID of its static pod. Otherwise, returns the
        // original UID.
        //
        // All public-facing functions should perform this translation for UIDs
        // because user may provide a mirror pod UID, which is not recognized by
        // internal Kubelet functions.
        TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
        // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
        // UIDs and mirror pod UIDs to static pod UIDs.
        GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
        // IsMirrorPodOf returns true if mirrorPod is a correct representation of
        // pod; false otherwise.
        IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool
    
        MirrorClient
    }
    

    上一篇介绍过static pod,与之对应的概念是mirror pod

    Pod status

    参考k8s.io\api\core\v1\types.go

    • Pending: Pending状态是指api-server已经接受了Pod的创建请求,但是有容器还没有启动。Pending包括Pod未被调度以及拉取镜像阶段。PodPending means the pod has been accepted by the system, but one or more of the containers has not been started. This includes time before being bound to a node, as well as time spent pulling images onto the host.
    • Running: Running是指所有的容器都已经运行过了,其中至少一个容器正在运行或者正在重新运行。PodRunning means the pod has been bound to a node and all of the containers have been started. At least one container is still running or is in the process of being restarted.
    • Succeeded: Succeeded表示所有容器都成功运行结束了,并且K8S不会重新启动这些容器。PodSucceeded means that all containers in the pod have voluntarily terminated with a container exit code of 0, and the system is not going to restart any of these containers.
    • Failed: Failed表示所有容器都不运行了,并且至少一个容器异常退出。PodFailed means that all containers in the pod have terminated, and at least one container has terminated in a failure (exited with a non-zero exit code or was stopped by the system).
    • Unknown: Unknown表示因为默写原因无法获知Pod状态,常见于Node失联。PodUnknown means that for some reason the state of the pod could not be obtained, typically due to an error in communicating with the host of the pod.

    PodWorkers

    k8s.io\kubernetes\pkg\kubelet\pod_workers.go

    // PodWorkers is an abstract interface for testability.
    type PodWorkers interface {
        UpdatePod(options *UpdatePodOptions)
        ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
        ForgetWorker(uid types.UID)
    }
    
    type podWorkers struct {
        // Protects all per worker fields.
        podLock sync.Mutex
    
        // Tracks all running per-pod goroutines - per-pod goroutine will be
        // processing updates received through its corresponding channel.
        podUpdates map[types.UID]chan UpdatePodOptions
        // Track the current state of per-pod goroutines.
        // Currently all update request for a given pod coming when another
        // update of this pod is being processed are ignored.
        isWorking map[types.UID]bool
        // Tracks the last undelivered work item for this pod - a work item is
        // undelivered if it comes in while the worker is working.
        lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions
    
        workQueue queue.WorkQueue
    
        // This function is run to sync the desired stated of pod.
        // NOTE: This function has to be thread-safe - it can be called for
        // different pods at the same time.
        syncPodFn syncPodFnType
    
        // The EventRecorder to use
        recorder record.EventRecorder
    
        // backOffPeriod is the duration to back off when there is a sync error.
        backOffPeriod time.Duration
    
        // resyncInterval is the duration to wait until the next sync.
        resyncInterval time.Duration
    
        // podCache stores kubecontainer.PodStatus for all pods.
        podCache kubecontainer.Cache
    }
    
    

    code

    k8s.io\kubernetes\pkg\kubelet\kubelet.go

    func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
        kl.syncLoop(updates, kl)
    }
    
    // syncLoop is the main loop for processing changes. It watches for changes from
    // three channels (file, apiserver, and http) and creates a union of them. For
    // any new change seen, will run a sync against desired state and running state. If
    // no changes are seen to the configuration, will synchronize the last known desired
    // state every sync-frequency seconds. Never returns.
    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
        // The resyncTicker wakes up kubelet to checks if there are any pod workers
        // that need to be sync'd. A one-second period is sufficient because the
        // sync interval is defaulted to 10s.
        syncTicker := time.NewTicker(time.Second)
        defer syncTicker.Stop()
        // 2s
        housekeepingTicker := time.NewTicker(housekeepingPeriod)
        defer housekeepingTicker.Stop()
        plegCh := kl.pleg.Watch()
        for {
            kl.syncLoopMonitor.Store(kl.clock.Now())
            if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
                break
            }
            kl.syncLoopMonitor.Store(kl.clock.Now())
        }
    }
    

    可以看到,updates贯穿整个过程,是一个非常重要的概念,所以上一篇整篇都在分析updates的由来。这里我们重点关注syncLoopIteration

    // syncLoopIteration reads from various channels and dispatches pods to the
    // given handler.
    //
    // Arguments:
    // 1.  configCh:       a channel to read config events from
    // 2.  handler:        the SyncHandler to dispatch pods to
    // 3.  syncCh:         a channel to read periodic sync events from
    // 4.  houseKeepingCh: a channel to read housekeeping events from
    // 5.  plegCh:         a channel to read PLEG updates from
    //
    // Events are also read from the kubelet liveness manager's update channel.
    //
    // The workflow is to read from one of the channels, handle that event, and
    // update the timestamp in the sync loop monitor.
    //
    // With that in mind, in truly no particular order, the different channels
    // are handled as follows:
    //
    // * configCh: dispatch the pods for the config change to the appropriate
    //             handler callback for the event type
    // * plegCh: update the runtime cache; sync pod
    // * syncCh: sync all pods waiting for sync
    // * houseKeepingCh: trigger cleanup of pods
    // * liveness manager: sync pods that have failed or in which one or more
    //                     containers have failed liveness checks
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        select {
        case u, open := <-configCh:
            // Update from a config source; dispatch it to the right handler
            // callback.
            if !open {
                glog.Errorf("Update channel is closed. Exiting the sync loop.")
                return false
            }
    
            switch u.Op {
            case kubetypes.ADD:
                // After restarting, kubelet will get all existing pods through
                // ADD as if they are new pods. These pods will then go through the
                // admission process and *may* be rejected. This can be resolved
                // once we have checkpointing.
                handler.HandlePodAdditions(u.Pods)
            case kubetypes.UPDATE:
                handler.HandlePodUpdates(u.Pods)
            case kubetypes.REMOVE:
                handler.HandlePodRemoves(u.Pods)
            case kubetypes.RECONCILE:
                handler.HandlePodReconcile(u.Pods)
            case kubetypes.DELETE:
                // DELETE is treated as a UPDATE because of graceful deletion.
                handler.HandlePodUpdates(u.Pods)
            case kubetypes.RESTORE:
                // These are pods restored from the checkpoint. Treat them as new
                // pods.
                handler.HandlePodAdditions(u.Pods)
            }
        }        
        return true
    }
    

    本文只关注syncLoopIteration函数的configCh分支,在可预见的未来,将分析其他分支。

    这里只是简单地将Pods分给对应的handler处理。

    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
        for _, pod := range pods {
            // Always add the pod to the pod manager. Kubelet relies on the pod
            // manager as the source of truth for the desired state. If a pod does
            // not exist in the pod manager, it means that it has been deleted in
            // the apiserver and no action (other than cleanup) is required.
            kl.podManager.AddPod(pod)
    
            mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
            kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        }
    }
    
    // dispatchWork starts the asynchronous sync of the pod in a pod worker.
    // If the pod is terminated, dispatchWork
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
        // Run the sync in an async worker.
        kl.podWorkers.UpdatePod(&UpdatePodOptions{
            Pod:        pod,
            MirrorPod:  mirrorPod,
            UpdateType: syncType,
            OnCompleteFunc: func(err error) {},
        })
    }
    

    调用podWorkers.UpdatePod执行操作。

    k8s.io\kubernetes\pkg\kubelet\pod_workers.go

    // Apply the new setting to the specified pod.
    func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
        pod := options.Pod
        uid := pod.UID
    
        if podUpdates, exists = p.podUpdates[uid]; !exists {
            podUpdates = make(chan UpdatePodOptions, 1)
            p.podUpdates[uid] = podUpdates
    
            go func() {
                defer runtime.HandleCrash()
                p.managePodLoop(podUpdates)
            }()
        }
        
        if !p.isWorking[pod.UID] {
            p.isWorking[pod.UID] = true
            podUpdates <- *options
        } 
    }
    

    给Pod创建一个goroutine,并创建一个channel管理它。

    func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
        for update := range podUpdates {
            err := func() error {  
                err = p.syncPodFn(syncPodOptions{
                    mirrorPod:      update.MirrorPod,
                    pod:            update.Pod,
                    podStatus:      status,
                    killPodOptions: update.KillPodOptions,
                    updateType:     update.UpdateType,
                })
                return err
            }()
        }
    }
    

    这里其实就是回调了podWorkers.syncPodFn方法。

    k8s.io\kubernetes\pkg\kubelet\kubelet.go

    func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
            klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)   
    }
    

    500多行的函数,里面就有我们需要的答案klet.syncPod。这是一个很复杂的函数,包括了所有Pod状态的处理过程,我们慢慢拆解。

    // syncPod is the transaction script for the sync of a single pod.
    //
    // Arguments:
    //
    // o - the SyncPodOptions for this invocation
    //
    // The workflow is:
    // * If the pod is being created, record pod worker start latency
    // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
    // * If the pod is being seen as running for the first time, record pod
    //   start latency
    // * Update the status of the pod in the status manager
    // * Kill the pod if it should not be running
    // * Create a mirror pod if the pod is a static pod, and does not
    //   already have a mirror pod
    // * Create the data directories for the pod if they do not exist
    // * Wait for volumes to attach/mount
    // * Fetch the pull secrets for the pod
    // * Call the container runtime's SyncPod callback
    // * Update the traffic shaping for the pod's ingress and egress limits
    //
    // If any step of this workflow errors, the error is returned, and is repeated
    // on the next syncPod call.
    func (kl *Kubelet) syncPod(o syncPodOptions) error {
        pod := o.pod
        mirrorPod := o.mirrorPod
        podStatus := o.podStatus
        updateType := o.updateType
        
        // if we want to kill a pod, do it now!
        if updateType == kubetypes.SyncPodKill {
            return kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride)
        }
        
        apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
        
        // Create Cgroups for the pod and apply resource parameters
        // to them if cgroups-per-qos flag is enabled.
        pcm := kl.containerManager.NewPodContainerManager()
        // If pod has already been terminated then we need not create
        // or update the pod's cgroup
        if !kl.podIsTerminated(pod) {
            // Create and Update pod's Cgroups
            pcm.EnsureExists(pod)
        }
        
        // Create Mirror Pod for Static Pod if it doesn't already exist
        if kubepod.IsStaticPod(pod) {
            if mirrorPod == nil || deleted {
                kl.podManager.CreateMirrorPod(pod)
            }
        }
        
        // Make data directories for the pod
        kl.makePodDataDirs(pod)
        
        // Volume manager will not mount volumes for terminated pods
        if !kl.podIsTerminated(pod) {
            // Wait for volumes to attach/mount
            kl.volumeManager.WaitForAttachAndMount(pod)
        }
        
        // Fetch the pull secrets for the pod
        pullSecrets := kl.getPullSecretsForPod(pod)
        
        // Call the container runtime's SyncPod callback
        result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
    }
    

    创建Pod过程:

    1. 如果需要kill,直接kill

    2. 给Pod创建PodStatus对象

    3. 创建cgroups

    4. 如果是static pod,就创建mirror pod,方便通过apiserver查询 static pod,只能查询,其他操作都不可以

    5. 创建数据目录,比如挂载目录

    6. 挂载目录

    7. 获取ImagePullSecrets

    8. 调用CRI创建Pod

    k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_manager.go

    // SyncPod syncs the running pod into the desired pod by executing following steps:
    //
    //  1. Compute sandbox and container changes.
    //  2. Kill pod sandbox if necessary.
    //  3. Kill any containers that should not be running.
    //  4. Create sandbox if necessary.
    //  5. Create init containers.
    //  6. Create normal containers.
    func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
        // Step 1: Compute sandbox and container changes.
        podContainerChanges := m.computePodActions(pod, podStatus)
        
        // Step 2: Kill the pod if the sandbox has changed.
        if podContainerChanges.KillPod {
            m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        }else {
            // Step 3: kill any running containers in this pod which are not to keep.
            for containerID, containerInfo := range podContainerChanges.ContainersToKill {
                m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil)
            }
        }
        
        // Step 4: Create a sandbox for the pod if necessary.
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        
        // Step 5: start the init container.
        if container := podContainerChanges.NextInitContainerToStart; container != nil {
            m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit)
        }
        
        // Step 6: start containers in podContainerChanges.ContainersToStart.
        for _, idx := range podContainerChanges.ContainersToStart {
            container := &pod.Spec.Containers[idx]
            m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular)
        }
    }
    

    这个函数的注释写得很赞,过程如下:

    1. 比较网络和容器变化
    2. 如果网络有变化,就把之前的容器删掉
    3. 创建容器网络
    4. 启动init容器
    5. 启动容器

    启动init容器也是有说法的,全部逻辑都在computePodActions里面,必须先按顺序将init容器全部启动之后,再启动容器,大概过程如下:

    1. 第一次只启动pod.Spec.InitContainers[0]changes.ContainersToStart为空
    2. 之后每次启动下一个init容器
    3. init容器启动完,启动容器

    k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_container.go

    // startContainer starts a container and returns a message indicates why it is failed on error.
    // It starts the container through the following steps:
    // * pull the image
    // * create the container
    // * start the container
    // * run the post start lifecycle hooks (if applicable)
    func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
        // Step 1: pull the image.
        imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)   
        
        // Step 2: create the container.
        m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
        
        // Step 3: start the container.
        m.runtimeService.StartContainer(containerID)
        
        legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)
        m.osInterface.Symlink(containerLog, legacySymlink)
    }
    

    经过一步步抽丝剥茧,这里终于真相了,

    1. 拉镜像
    2. create 容器,runtimeService其实就是通过grpc调用CRI
    3. start 容器
    4. 给容器日志创建soft link,增加K8S相关信息,这里日志采集的时候就很有用,详见K8S Fluentd Mongo日志采集

    相关文章

      网友评论

          本文标题:深入分析kubelet(2)——创建Pod

          本文链接:https://www.haomeiwen.com/subject/onmvoftx.html