美文网首页
kubelet 原理解析四:probeManager

kubelet 原理解析四:probeManager

作者: 徐亚松_v | 来源:发表于2020-03-27 00:11 被阅读0次

    概述

    在Kubernetes 中,系统和应用程序的健康检查任务是由 kubelet 来完成的,本文主要讨论kubelet中 probemanager 相关的实现原理。

    如果你对k8s的各种probe如何使用还不了解,可以看下我之前写的这篇K8S 中的健康检查机制,是从实践的角度介绍的。

    statusManager

    在 kubelet 初始化的时候,会创建 statusManager 和 probeManager,这两个都是和 pod 状态相关的逻辑,在kubelet 原理解析一:pod管理文章中有提到,statusManager 负责维护状态信息,并把Pod状态及时更新到Api-Server,

    但是它并不负责监控 pod 状态的变化,而是提供对应的接口供其他组件调用,比如 probeManager。probeManager 会定时去监控 pod 中容器的健康状况,一旦发现状态发生变化,就调用 statusManager 提供的方法更新 pod 的状态。

    klet.statusManager = status.NewManager(kubeClient, klet.podManager)
    klet.probeManager = prober.NewManager(
            klet.statusManager,
            klet.livenessManager,
            klet.runner,
            containerRefManager,
            kubeDeps.Recorder)
    

    statusManager代码位于:pkg/kubelet/status/status_manager.go

    type PodStatusProvider interface {
        GetPodStatus(uid types.UID) (api.PodStatus, bool)
    }
    
    type Manager interface {
        PodStatusProvider
        Start()
        SetPodStatus(pod *api.Pod, status api.PodStatus)
        SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
        TerminatePod(pod *api.Pod)
        RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
    }
    
    SetPodStatus:如果 pod 的状态发生了变化,会调用这个方法,把新状态更新到 apiserver,一般在 kubelet 维护 pod 生命周期的时候会调用
    
    SetContainerReadiness:如果健康检查发现 pod 中容器的健康状态发生变化,会调用这个方法,修改 pod 的健康状态
    
    TerminatePod:kubelet 在删除 pod 的时候,会调用这个方法,把 pod 中所有的容器设置为 terminated 状态
    
    RemoveOrphanedStatuses:删除孤儿 pod,直接把对应的状态数据从缓存中删除即可
    

    Start() 方法是在 kubelet 运行的时候调用的,它会启动一个 goroutine 执行更新操作:

    const syncPeriod = 10 * time.Second
    
    func (m *manager) Start() {
        ......
        glog.Info("Starting to sync pod status with apiserver")
        syncTicker := time.Tick(syncPeriod)
        // syncPod and syncBatch share the same go routine to avoid sync races.
        go wait.Forever(func() {
            select {
            case syncRequest := <-m.podStatusChannel:
                m.syncPod(syncRequest.podUID, syncRequest.status)
            case <-syncTicker:
                m.syncBatch()
            }
        }, 0)
    }
    

    这个 goroutine 就能不断地从两个 channel 监听数据进行处理:syncTicker 是个定时器,也就是说它会定时保证 apiserver 和自己缓存的最新 pod 状态保持一致;podStatusChannel 是所有 pod 状态更新发送到的地方,调用方不会直接操作这个 channel,而是通过调用上面提到的修改状态的各种方法,这些方法内部会往这个 channel 写数据。

    m.syncPod 根据参数中的 pod 和它的状态信息对 apiserver 中的数据进行更新,如果发现 pod 已经被删除也会把它从内部数据结构中删除。

    probeManager

    probeManager负责 检测 pod 中容器的健康状态,目前有三种 probe:

    • liveness: 让Kubernetes知道你的应用程序是否健康,如果你的应用程序不健康,Kubernetes将删除Pod并启动一个新的替换它(与RestartPolicy有关)。Liveness 探测可以告诉 Kubernetes 什么时候通过重启容器实现自愈。
    • readiness: readiness与liveness原理相同,不过Readiness探针是告诉 Kubernetes 什么时候可以将容器加入到 Service 负载均衡中,对外提供服务。
    • startupProbe:1.16开始支持的新特性,检测慢启动容器的状态,具体参考startup-probes

    并不是所有的 pod 中的容器都有健康检查的探针,如果没有,则不对容器进行检测,默认认为容器是正常的。在每次创建新 pod 的时候,kubelet 都会调用 probeManager.AddPod(pod) 方法,它对应的实现在 pkg/kubelet/prober/prober_manager.go 文件中:

    func (m *manager) AddPod(pod *v1.Pod) {
        m.workerLock.Lock()
        defer m.workerLock.Unlock()
    
        key := probeKey{podUID: pod.UID}
        for _, c := range pod.Spec.Containers {
            key.containerName = c.Name
    
            if c.ReadinessProbe != nil {
                key.probeType = readiness
                if _, ok := m.workers[key]; ok {
                    klog.Errorf("Readiness probe already exists! %v - %v",
                        format.Pod(pod), c.Name)
                    return
                }
                w := newWorker(m, readiness, pod, c)
                m.workers[key] = w
                go w.run()
            }
    
            if c.LivenessProbe != nil {
                key.probeType = liveness
                if _, ok := m.workers[key]; ok {
                    klog.Errorf("Liveness probe already exists! %v - %v",
                        format.Pod(pod), c.Name)
                    return
                }
                w := newWorker(m, liveness, pod, c)
                m.workers[key] = w
                go w.run()
            }
        }
    }
    

    在这个方法里,kubelet 会遍历pod 中所有的 container,如果配置了 probe,就创建一个 worker,并异步处理这次探测

    // Creates and starts a new probe worker.
    func newWorker(
        m *manager,
        probeType probeType,
        pod *v1.Pod,
        container v1.Container) *worker {
    
        w := &worker{
            stopCh:       make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
            pod:          pod,
            container:    container,
            probeType:    probeType,
            probeManager: m,
        }
    
        switch probeType {
        case readiness:
            w.spec = container.ReadinessProbe
            w.resultsManager = m.readinessManager
            w.initialValue = results.Failure
        case liveness:
            w.spec = container.LivenessProbe
            w.resultsManager = m.livenessManager
            w.initialValue = results.Success
        }
    
        w.proberResultsMetricLabels = prometheus.Labels{
            "probe_type":     w.probeType.String(),
            "container_name": w.container.Name,
            "pod_name":       w.pod.Name,
            "namespace":      w.pod.Namespace,
            "pod_uid":        string(w.pod.UID),
        }
    
        return w
    }
    
    

    worker 开始run之后,会调用doProbe方法

    func (w *worker) doProbe() (keepGoing bool) {
        defer func() { recover() }() 
        defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
    
        // pod 没有被创建,或者已经被删除了,直接跳过检测,但是会继续检测
        status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
        if !ok {
            glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
            return true
        }
    
        // pod 已经退出(不管是成功还是失败),直接返回,并终止 worker
        if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
            glog.V(3).Infof("Pod %v %v, exiting probe worker",
                format.Pod(w.pod), status.Phase)
            return false
        }
    
        // 容器没有创建,或者已经删除了,直接返回,并继续检测,等待更多的信息
        c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
        if !ok || len(c.ContainerID) == 0 {
            glog.V(3).Infof("Probe target container not found: %v - %v",
                format.Pod(w.pod), w.container.Name)
            return true 
        }
    
        // pod 更新了容器,使用最新的容器信息
        if w.containerID.String() != c.ContainerID {
            if !w.containerID.IsEmpty() {
                w.resultsManager.Remove(w.containerID)
            }
            w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
            w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
            w.onHold = false
        }
    
        if w.onHold {
            return true
        }
    
        if c.State.Running == nil {
            glog.V(3).Infof("Non-running container probed: %v - %v",
                format.Pod(w.pod), w.container.Name)
            if !w.containerID.IsEmpty() {
                w.resultsManager.Set(w.containerID, results.Failure, w.pod)
            }
            // 容器失败退出,并且不会再重启,终止 worker
            return c.State.Terminated == nil ||
                w.pod.Spec.RestartPolicy != api.RestartPolicyNever
        }
    
        // 容器启动时间太短,没有超过配置的初始化等待时间 InitialDelaySeconds
        if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
            return true
        }
    
        // 调用 prober 进行检测容器的状态
        result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
        if err != nil {
            return true
        }
    
        if w.lastResult == result {
            w.resultRun++
        } else {
            w.lastResult = result
            w.resultRun = 1
        }
    
        // 如果容器退出,并且没有超过最大的失败次数,则继续检测
        if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
            (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
            return true
        }
    
        // 保存最新的检测结果
        w.resultsManager.Set(w.containerID, result, w.pod)
    
        if w.probeType == liveness && result == results.Failure {
            // 容器 liveness 检测失败,需要删除容器并重新创建,在新容器成功创建出来之前,暂停检测
            w.onHold = true
        }
    
        return true
    }
    

    liveness检测结果会存放在resultsManager,它把结果保存在缓存中,并发送到 m.updates 管道。而管道消费者是 kubelet 中的主循环syncLoopIteration。

    case update := <-kl.livenessManager.Updates():
            if update.Result == proberesults.Failure {
                // The liveness manager detected a failure; sync the pod.
                pod, ok := kl.podManager.GetPodByUID(update.PodUID)
                if !ok {
                    // If the pod no longer exists, ignore the update.
                    glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
                    break
                }
                glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
                handler.HandlePodSyncs([]*api.Pod{pod})
            }
    

    liveness检测如果不通过,pod就会重启,由 kubelet 的 sync 循环处理即可。但 readness检测失败不能重启 pod,因此readness的逻辑是:

    func (m *manager) updateReadiness() {
        update := <-m.readinessManager.Updates()
    
        ready := update.Result == results.Success
        m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
    }
    

    proberManager 启动的时候,会运行一个 goroutine 定时读取 readinessManager 管道中的数据,并根据数据调用 statusManager 去更新 apiserver 中 pod 的状态信息。

    负责 Service 逻辑的组件获取到了这个状态,就能根据不同的值来决定是否需要更新 endpoints 的内容,也就是 service 的请求是否发送到这个 pod。

    Probe 方法

    上面是 probemanager 的主要逻辑,我们接下来看下真正执行探测任务的 probe方法

    // probe probes the container.
    func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
        var probeSpec *v1.Probe
        switch probeType {
        case readiness:
            probeSpec = container.ReadinessProbe
        case liveness:
            probeSpec = container.LivenessProbe
        default:
            return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
        }
        ...
        result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
        ...
    

    probe主方法调用pb.runProbeWithRetries 方法,传入containerid、类型、重试次数等。

    exec 方法

    调用runtimeService的ExecSync方法进入容器执行命令,回收结果,如果退出码为 0 ,就认为探测成功。

    command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
            return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
        
    ....
        
    func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
        return execInContainer{func() ([]byte, error) {
            return pb.runner.RunInContainer(containerID, cmd, timeout)
        }}
    }
    
    ...
    
    func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
        stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, timeout)
        return append(stdout, stderr...), err
    }
    
    func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
        data, err := e.CombinedOutput()
        klog.V(4).Infof("Exec probe response: %q", string(data))
        if err != nil {
            exit, ok := err.(exec.ExitError)
            if ok {
                if exit.ExitStatus() == 0 {
                    return probe.Success, string(data), nil
                }
                return probe.Failure, string(data), nil
            }
            return probe.Unknown, "", err
        }
        return probe.Success, string(data), nil
    }
    
    

    HTTP 方法

    标准的 http 探测模板,如果400 > code >= 200,则认为成功。不支持 https

    func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) {
        req, err := http.NewRequest("GET", url.String(), nil)
        if err != nil {
            // Convert errors into failures to catch timeouts.
            return probe.Failure, err.Error(), nil
        }
        if _, ok := headers["User-Agent"]; !ok {
            if headers == nil {
                headers = http.Header{}
            }
            // explicitly set User-Agent so it's not set to default Go value
            v := version.Get()
            headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor))
        }
        req.Header = headers
        if headers.Get("Host") != "" {
            req.Host = headers.Get("Host")
        }
        res, err := client.Do(req)
        if err != nil {
            // Convert errors into failures to catch timeouts.
            return probe.Failure, err.Error(), nil
        }
        defer res.Body.Close()
        b, err := ioutil.ReadAll(res.Body)
        if err != nil {
            return probe.Failure, "", err
        }
        body := string(b)
        if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
            klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
            return probe.Success, body, nil
        }
        klog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body)
        return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
    }
    

    TCP 方法

    gRPC或FTP服务一般会使用 TCP 探测,尝试在指定端口上建立TCP连接。

    如果socket连接能成功,则返回成功。

    func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
        conn, err := net.DialTimeout("tcp", addr, timeout)
        if err != nil {
            // Convert errors to failures to handle timeouts.
            return probe.Failure, err.Error(), nil
        }
        err = conn.Close()
        if err != nil {
            klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
        }
        return probe.Success, "", nil
    }
    
    

    参考

    相关文章

      网友评论

          本文标题:kubelet 原理解析四:probeManager

          本文链接:https://www.haomeiwen.com/subject/pzkkuhtx.html