美文网首页
[k8s源码分析][controller-manager] po

[k8s源码分析][controller-manager] po

作者: nicktming | 来源:发表于2019-10-23 22:17 被阅读0次

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/controller/podgc
分支: tming-v1.13 (基于v1.13版本)

关于各类controller都会用到informers, 所以关于informers, 可以参考 [k8s源码分析][client-go] informer之SharedInformerFactory.

本文分析的是gc_controller, 就是处理pod的垃圾回收的控制器, 是各种controller中属于较简单的其中一个了, 因为它的逻辑比较简单.

2. PodGCController

2.1 启动

关于kube-controller-manager组件整体的运行会有专门博客介绍, 这里直接看一下podgc这个controller是如何启动的.

// cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    ...
    controllers["podgc"] = startPodGCController
    ...
    return controllers
}
// cmd/kube-controller-manager/app/core.go
unc startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
    go podgc.NewPodGC(
        ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
        ctx.InformerFactory.Core().V1().Pods(),
        int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
    ).Run(ctx.Stop)
    return nil, true, nil
}

可以看到生成一个PodGCController对象, 然后以goroutine的方式启动它的Run方法.

2.2 PodGCController

// pkg/controller/podgc/gc_controller.go
const (
    gcCheckPeriod = 20 * time.Second
)
type PodGCController struct {
    kubeClient clientset.Interface
    // 获得本地缓存的Lister
    podLister       corelisters.PodLister
    // 同步函数
    podListerSynced cache.InformerSynced
    // 删除pod的方法
    deletePod              func(namespace, name string) error
    // 一个阈值
    terminatedPodThreshold int
}

func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer, terminatedPodThreshold int) *PodGCController {
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }
    gcc := &PodGCController{
        kubeClient:             kubeClient,
        terminatedPodThreshold: terminatedPodThreshold,
        deletePod: func(namespace, name string) error {
            klog.Infof("PodGC is force deleting Pod: %v/%v", namespace, name)
            return kubeClient.CoreV1().Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
        },
    }
    // 获得本地缓存
    gcc.podLister = podInformer.Lister()
    // 等待同步函数
    gcc.podListerSynced = podInformer.Informer().HasSynced
    return gcc
}
func (gcc *PodGCController) Run(stop <-chan struct{}) {
    defer utilruntime.HandleCrash()

    klog.Infof("Starting GC controller")
    defer klog.Infof("Shutting down GC controller")
    // 等待同步
    if !controller.WaitForCacheSync("GC", stop, gcc.podListerSynced) {
        return
    }
    // 执行gc方法
    go wait.Until(gcc.gc, gcCheckPeriod, stop)
    <-stop
}

主要是每隔20秒执行gc方法.

gc

func (gcc *PodGCController) gc() {
    // 获得本地缓存的所有的pods
    pods, err := gcc.podLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("Error while listing all Pods: %v", err)
        return
    }
    // 代表集群中最多留terminatedPodThreshold个terminating的pod
    if gcc.terminatedPodThreshold > 0 {
        gcc.gcTerminated(pods)
    }
    // 删除那些在失联的节点上的pods (某些节点被删除了或者失联了需要删除在这些节点上的pods)
    gcc.gcOrphaned(pods)
    // 删除那些还没有被分配到任何节点但是已经在terminating的pods
    gcc.gcUnscheduledTerminating(pods)
}
gc.png

关于terminating的定义如下:

func isPodTerminated(pod *v1.Pod) bool {
    if phase := pod.Status.Phase; phase != v1.PodPending && phase != v1.PodRunning && phase != v1.PodUnknown {
        return true
    }
    return false
}

gc 主要做三件事情:
1. 删除那些terminatingpods. (集群中最多留terminatedPodThresholdterminating pod.)
2. 删除那些在失联的节点上的pods (某些节点被删除了或者失联了需要删除在这些节点上的pods)
3. 删除那些还没有被分配到任何节点但是已经在terminatingpods.

gcTerminated
func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
    terminatedPods := []*v1.Pod{}
    // 过滤出所有的terminating的pods
    for _, pod := range pods {
        if isPodTerminated(pod) {
            terminatedPods = append(terminatedPods, pod)
        }
    }

    terminatedPodCount := len(terminatedPods)
    sort.Sort(byCreationTimestamp(terminatedPods))

    deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
    // 就是最多留terminatedPodThreshold个terminated的pods 其余的都要删除
    if deleteCount > terminatedPodCount {
        deleteCount = terminatedPodCount
    }
    if deleteCount > 0 {
        klog.Infof("garbage collecting %v pods", deleteCount)
    }
    // 删除pods
    var wait sync.WaitGroup
    for i := 0; i < deleteCount; i++ {
        wait.Add(1)
        go func(namespace string, name string) {
            defer wait.Done()
            if err := gcc.deletePod(namespace, name); err != nil {
                // ignore not founds
                defer utilruntime.HandleError(err)
            }
        }(terminatedPods[i].Namespace, terminatedPods[i].Name)
    }
    wait.Wait()
}

作用是计算出要删除的terminatingpods个数. 然后做删除操作.

gcOrphaned
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
    klog.V(4).Infof("GC'ing orphaned")
    // We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible.
    // 取出本地缓存的节点
    nodes, err := gcc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
    if err != nil {
        return
    }
    nodeNames := sets.NewString()
    for i := range nodes.Items {
        nodeNames.Insert(nodes.Items[i].Name)
    }

    for _, pod := range pods {
        if pod.Spec.NodeName == "" {
            continue
        }
        if nodeNames.Has(pod.Spec.NodeName) {
            continue
        }
        // 删除那些在失联的节点上的pods (某些节点被删除了或者失联了需要删除在这些节点上的pods)
        klog.V(2).Infof("Found orphaned Pod %v/%v assigned to the Node %v. Deleting.", pod.Namespace, pod.Name, pod.Spec.NodeName)
        if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
            utilruntime.HandleError(err)
        } else {
            klog.V(0).Infof("Forced deletion of orphaned Pod %v/%v succeeded", pod.Namespace, pod.Name)
        }
    }
}

找到那些绑定在已经不存在的节点上的pods并删除.

gcUnscheduledTerminating
func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) {
    klog.V(4).Infof("GC'ing unscheduled pods which are terminating.")

    for _, pod := range pods {
        if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
            continue
        }
        // 表明 pod.DeletionTimestamp != nil && len(pod.Spec.NodeName) <= 0
        // 删除那些还没有被分配到任何节点但是已经在terminating的pods
        klog.V(2).Infof("Found unscheduled terminating Pod %v/%v not assigned to any Node. Deleting.", pod.Namespace, pod.Name)
        if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
            utilruntime.HandleError(err)
        } else {
            klog.V(0).Infof("Forced deletion of unscheduled terminating Pod %v/%v succeeded", pod.Namespace, pod.Name)
        }
    }
}

删除那些还没有分配并且已经在terminatingpods.

3. 总结

可以看到该controller就是定期去删除一些集群中认为没有用的pods.

相关文章

网友评论

      本文标题:[k8s源码分析][controller-manager] po

      本文链接:https://www.haomeiwen.com/subject/qethvctx.html