美文网首页
kubelet 源码调用过程2

kubelet 源码调用过程2

作者: shoyu666 | 来源:发表于2022-03-15 10:17 被阅读0次

https://www.jianshu.com/p/95cd1556966a
kubelet 会监听apiService的Pod变化,事件会发送到 listenChannel

//pkg/util/config/config.go
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
        //遍历事件
    for update := range listenChannel {
        m.merger.Merge(source, update)
    }
}

listen方法遍历事件,调用Merge 持续处理Pod变更事件

//pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {
    //分组
    adds, updates, deletes, removes, reconciles := s.merge(source, change)
    switch s.mode {
            case PodConfigNotificationIncremental:
                       s.updates <- *adds
}

merge 方法根据内存中 Pod信息 将 change 分组为以下几种

//pkg/kubelet/types/pod_update.go
const (
    // SET is the current pod configuration.
    SET PodOperation = iota
    // ADD signifies pods that are new to this source.
    ADD
    // DELETE signifies pods that are gracefully deleted from this source.
    DELETE
    // REMOVE signifies pods that have been removed from this source.
    REMOVE
    // UPDATE signifies pods have been updated in this source.
    UPDATE
    // RECONCILE signifies pods that have unexpected status in this source,
    // kubelet should reconcile status with this source.
    RECONCILE
)

s.mode 目前是PodConfigNotificationIncremental,s.updates <- *adds就是将要添加的Pod事件 发到 updates chan 。

//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate,handler SyncHandler
{
select {
  case u, open := <-configCh:
   switch u.Op {     case kubetypes.ADD:
        handler.HandlePodAdditions(u.Pods)
     case kubetypes.UPDATE:

configCh就是updates chan,流程到 handler.HandlePodAdditions(u.Pods)

//pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
   kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}

dispatchWork 分发任务

//pkg/kubelet/kubelet.go
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
       // 转 podWorkers 处理
    kl.podWorkers.UpdatePod(UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        StartTime:  start,
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}

请求 podWorkers.UpdatePod 参数是一个UpdatePodOptions

//pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
        //获取当前的Pod sync状态
    status, ok := p.podSyncStatuses[uid]
    if !ok {
                //获取失败说明是第一次
        klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
        status = &podSyncStatus{
            syncedAt: now,
            fullname: kubecontainer.GetPodFullName(pod),
        }
        // if this pod is being synced for the first time, we need to make sure it is an active pod
               //如果是第一次 需要确保Pod是活跃的(PodFailed 或者 PodSucceeded 状态 )
        if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
            // check to see if the pod is not running and the pod is terminal.
            // If this succeeds then record in the podWorker that it is terminated.
            if statusCache, err := p.podCache.Get(pod.UID); err == nil {
                if isPodStatusCacheTerminal(statusCache) {
                                       //状态
                    status = &podSyncStatus{
                        terminatedAt:       now,
                        terminatingAt:      now,
                        syncedAt:           now,
                        startedTerminating: true,
                        finished:           true,
                        fullname:           kubecontainer.GetPodFullName(pod),
                    }
                }
            }
        }
                //更新状态
        p.podSyncStatuses[uid] = status
    }
    if status.IsTerminationRequested() {
        if options.UpdateType == kubetypes.SyncPodCreate {
                        //Pod请求创建,但是当前是Terminationing状态,重置状态为 restartRequested =true
            status.restartRequested = true
            klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled 
                        later", "pod", klog.KObj(pod), "podUID", pod.UID)
            return
        }
    }

    // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
      //Pod的状态是Finished的,不再处理
    if status.IsFinished() {
        klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
        return
    }
        //任务类型
        workType = SyncPodWork

    // the desired work we want to be performing
        //任务对象
    work := podWork{
        WorkType: workType,
        Options:  options,
    }
if podUpdates, exists = p.podUpdates[uid]; !exists {
         //任务没启动则启动
        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()

    // dispatch a request to the pod worker if none are running
        // 当前Pod没有处于 Sync
    if !status.IsWorking() {
        status.working = true
        podUpdates <- work
        return
    }
}

managePodLoop 持续遍历 podUpdates chan,每个podUpdate事件调用 syncPodFn 处理

//pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
    for update := range podUpdates {
        //获取比lastSyncTime 是更新的状态
    status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime)

       //syncPodFn 函数== /pkg/kubelet/kubelet.go#syncPod函数
       err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
        //重新进入队列
    p.completeWork(pod, err)
}

syncPodFn 其实就是 pkg/kubelet/kubelet.go 的 syncPod 函数

//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
                //判断Pod是否可以被接纳(基于)
                runnable := kl.canRunPod(pod)
                //更新pod状态  由 pkg/kubelet/status/status_manager.go 负责处理
        kl.statusManager.SetPodStatus(pod, apiPodStatus)
                ......
               //创建Pod目录
               //var/lib/kubelet/pods/{podUID}
              // var/lib/kubelet/pods/{podUID}//volumes
              // var/lib/kubelet/pods/{podUID}//plugins
              if err := kl.makePodDataDirs(pod); err != nil {
                //获得pullImage 的 secrets
                pullSecrets := kl.getPullSecretsForPod(pod)
                result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
}

syncPod 函数通过statusManager 更新 Pod状态, 创建目录,调用 pkg/kubelet/kuberuntime/kuberuntime_manager.go 的 SyncPod,SyncPod 方法,官方代码也给出了提示,共7个步骤

//pkg/kubelet/kuberuntime/kuberuntime_manager.go 
//  1. Compute sandbox and container changes. 计算sandbox和 container的变化
//  2. Kill pod sandbox if necessary.  如果有需要就干掉sandbox
//  3. Kill any containers that should not be running. 干掉所有不需要运行的container
//  4. Create sandbox if necessary.  如果有需要就创建sandbox
//  5. Create ephemeral containers. 创建临时的 container
//  6. Create init containers.   创建init container
//  7. Create normal containers.  创建正常container
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
      //Step1
      .....
}
  1. Compute sandbox and container changes. 计算sandbox和 container的变化
/Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(legacyscheme.Scheme, pod)
        if err != nil {
            klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
        }
        if podContainerChanges.SandboxID != "" {
                       //Pod 变更 需要Kill 和 re-Created
            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
        } else {
                        //新Pod 需要创建
            klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
        }
    }
  1. Kill pod sandbox if necessary. 如果有需要就干掉sandbox
  2. Kill any containers that should not be running. 干掉所有不需要运行的container

    if podContainerChanges.KillPod {
// Step 2: Kill the pod if the sandbox has changed.
        if podContainerChanges.CreateSandbox {
            klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
        } else {
            klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
        }
                //通过CRI   kill pod和 container  分别对应调用CRI 的  StopPodSandbox 和 StopContainer
        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
    // Step 3: kill any running containers in this pod which are not to keep.

Step2和Step3互斥

  1. Create sandbox if necessary. 如果有需要就创建sandbox
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
    podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)

Step5,Step6,Step7 分别启动 临时容器,init容器,普通容器

//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
       // 创建日志目录 /var/log/pods/{podNamespace}_{podName}_{podUID}
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
       //runtimeHandler 具体的容器运行时Handler
        runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
        //创建PodSandbox
    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    
// Step 5: start ephemeral containers
    // These are started "prior" to init containers to allow running ephemeral containers even when there
    // are errors starting an init container. In practice init containers will start first since ephemeral
    // containers cannot be specified on pod creation.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
                         //启动临时容器
            start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
      //启动init容器
        if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
    }

    // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
                //启动容器
        start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }

start 方法是启动容器分别为: pull 镜像,创建容器,启动容器,执行 生命周期Hook

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
            // Step 1: pull the image.   (imageManager负责)
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
    // Step 2: create the container.     通过调用  CRI 的 CreateContainer
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    // Step 3: start the container.        通过调用CRI StartContainer
    err = m.runtimeService.StartContainer(containerID)
}

相关文章

网友评论

      本文标题:kubelet 源码调用过程2

      本文链接:https://www.haomeiwen.com/subject/icnjdrtx.html