美文网首页
从k8s 源码 看pod

从k8s 源码 看pod

作者: wwq2020 | 来源:发表于2020-07-16 20:42 被阅读0次

pod

创建sandbox(也就是pause container),用于pod内容器通过localhost互通
创建临时容器
创建所需的initcontainer,
创建普通容器

pkg/kubelet/kuberuntime/kuberuntime_manager.go中

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(legacyscheme.Scheme, pod)
        if err != nil {
            klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
        }
        if podContainerChanges.SandboxID != "" {
            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
        } else {
            klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
        }
    }

    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
        if podContainerChanges.CreateSandbox {
            klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
        } else {
            klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
        }

        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
            result.AddSyncResult(killContainerResult)
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
                return
            }
        }
    }

    // Keep terminated init containers fairly aggressively controlled
    // This is an optimization because container removals are typically handled
    // by container garbage collector.
    m.pruneInitContainersBeforeStart(pod, podStatus)

    // We pass the value of the PRIMARY podIP and list of podIPs down to
    // generatePodSandboxConfig and generateContainerConfig, which in turn
    // passes it to various other functions, in order to facilitate functionality
    // that requires this value (hosts file and downward API) and avoid races determining
    // the pod IP in cases where a container requires restart but the
    // podIP isn't in the status manager yet. The list of podIPs is used to
    // generate the hosts file.
    //
    // We default to the IPs in the passed-in pod status, and overwrite them if the
    // sandbox needs to be (re)started.
    var podIPs []string
    if podStatus != nil {
        podIPs = podStatus.IPs
    }

    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        var msg string
        var err error

        klog.V(4).Infof("Creating PodSandbox for pod %q", format.Pod(pod))
        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
        result.AddSyncResult(createSandboxResult)
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        if err != nil {
            createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
            klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
            return
        }
        klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
        if err != nil {
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
            klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
            result.Fail(err)
            return
        }

        // If we ever allow updating a pod from non-host-network to
        // host-network, we may use a stale IP.
        if !kubecontainer.IsHostNetworkPod(pod) {
            // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
            podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
            klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod))
        }
    }

    // the start containers routines depend on pod ip(as in primary pod ip)
    // instead of trying to figure out if we have 0 < len(podIPs)
    // everytime, we short circuit it here
    podIP := ""
    if len(podIPs) != 0 {
        podIP = podIPs[0]
    }

    // Get podSandboxConfig for containers to start.
    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
    result.AddSyncResult(configPodSandboxResult)
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
        return
    }

    // Helper containing boilerplate common to starting all types of containers.
    // typeName is a label used to describe this type of container in log messages,
    // currently: "container", "init container" or "ephemeral container"
    start := func(typeName string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
            return err
        }

        klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).Infof("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg)
            default:
                utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
            }
            return err
        }

        return nil
    }

    // Step 5: start ephemeral containers
    // These are started "prior" to init containers to allow running ephemeral containers even when there
    // are errors starting an init container. In practice init containers will start first since ephemeral
    // containers cannot be specified on pod creation.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
            start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }

    // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        start("container", containerStartSpec(&pod.Spec.Containers[idx]))
    }

    return
}

pkg/kubelet/kuberuntime/kuberuntime_sandbox.go中

func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        return "", message, err
    }

    // Create pod logs directory
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    if err != nil {
        message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
        klog.Errorf(message)
        return "", message, err
    }

    runtimeHandler := ""
    if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
        runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
        if err != nil {
            message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
            return "", message, err
        }
        if runtimeHandler != "" {
            klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
        }
    }

    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    if err != nil {
        message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        return "", message, err
    }

    return podSandBoxID, "", nil
}

pkg/kubelet/cri/remote/remote_runtime.go中

func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
    // Use 2 times longer timeout for sandbox operation (4 mins by default)
    // TODO: Make the pod sandbox timeout configurable.
    timeout := r.timeout * 2

    klog.V(10).Infof("[RemoteRuntimeService] RunPodSandbox (config=%v, runtimeHandler=%v, timeout=%v)", config, runtimeHandler, timeout)

    ctx, cancel := getContextWithTimeout(timeout)
    defer cancel()

    resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
        Config:         config,
        RuntimeHandler: runtimeHandler,
    })
    if err != nil {
        klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
        return "", err
    }

    if resp.PodSandboxId == "" {
        errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
        klog.Errorf("RunPodSandbox failed: %s", errorMessage)
        return "", errors.New(errorMessage)
    }

    klog.V(10).Infof("[RemoteRuntimeService] RunPodSandbox Response (PodSandboxId=%v)", resp.PodSandboxId)

    return resp.PodSandboxId, nil
}
func NewRuntimeServiceClient(cc *grpc.ClientConn) RuntimeServiceClient {
    return &runtimeServiceClient{cc}
}

func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
    out := new(RunPodSandboxResponse)
    err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/RunPodSandbox", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

相关文章

网友评论

      本文标题:从k8s 源码 看pod

      本文链接:https://www.haomeiwen.com/subject/yzmmhktx.html