美文网首页
k8s informers 开发

k8s informers 开发

作者: 郭青耀 | 来源:发表于2021-10-18 23:13 被阅读0次

    为什么需要informer

    关于K8S的设计理念,耳熟能详的概念有很多,比如

    • 声明式API
    • 最终一致性
    • 水平触发
    • ...

    这些概念直白一点理解就是: API只是描述了我期待的样子(声明式API),系统(这里是指controller)保证我最终实现的样子和期待的样子保持一致(最终一致性),整个过程体现的就是水平触发。

    虽然不是很准确,但是就是那么回事。

    简单拓展一下:
    声明式API的调用返回是在写入ETCD之后,就返回了,不负责最终实现。
    最终一致性需要监控资源的变化(List&Watch),并将变化通知到(informers)对应的controller,controller接受变化信息后,做出相应的动作。
    水平触发=电平触发 = 状态触发 ,通过状态采样,通知到对方实际状态,通常会通知多次;相对应的是边沿触发=边缘触发只是在改变的时候触发只会通知一次,错过了就错过了。


    水平触发.png

    一个具体实例

    这里实现的是一个反向代理statefulset的pod,pod 的状态异常时候能及时调整反向代理的后端服务IP,之所以要识别到系统的pod ip 。
    其中的关键函数是initInformer,它注册了系统的通知函数,里面处理想要关注的资源的添加,更新,删除。
    分别实现这三个函数,就能动态获取后台状态正常的 pod

    package main
    
    import (
        "errors"
        "flag"
        "net/http"
        "net/http/httputil"
        "net/url"
        "strconv"
        "sync"
        "time"
    
        "github.com/sirupsen/logrus"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/util/wait"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/tools/clientcmd"
    )
    
    type HandleServer struct {
    }
    
    type Cmd struct {
        localPort   int
        serviceName string
        serverPort  int
        kubeConfig  string
    }
    //这里存放statefulset 状态为正常的Pod,涉及协程直接的同步,所以这里使用了sync.Map
    var podNameMap sync.Map
    var cmd Cmd
    
    func parseCmd() {
        flag.IntVar(&cmd.localPort, "listenPort", 8888, "listen on local port")
        flag.StringVar(&cmd.serviceName, "svc", "datahandle", "service name")
        flag.IntVar(&cmd.serverPort, "reversePort", 8086, "reverse server proxy port")
        flag.StringVar(&cmd.kubeConfig, "kubeconfig", "~/.kube/config", "kubeconfig path")
        flag.Parse()
    }
    //反向代理的服务器
    func (this *HandleServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        svcName := cmd.serviceName
        podName := r.URL.Query().Get("LocalFileHost")
        if "" != podName {
            logrus.Infoln(podName)
            _, ok := podNameMap.Load(podName)
            if ok {
                svcName = podName + "." + svcName
            }
        }
        svc, err := url.Parse("http://" + svcName + ":" + strconv.Itoa(cmd.serverPort))
        if err != nil {
            logrus.Error(err)
        }
        proxy := httputil.NewSingleHostReverseProxy(svc)
        proxy.ServeHTTP(w, r)
    }
    //过滤是否为需要处理的pod
    func checkPodInfo(labels map[string]string) bool {
        releaseName, ok := labels["release"]
        if !ok {
            return false
        }
        if releaseName != "cgs" {
            return false
        }
    
        return true
    }
    
    //pod添加事件,处理,启动的时候会触发一次
    func informerAdd(obj interface{}) {
        pod, ok := obj.(*corev1.Pod)
        if ok != true {
            return
        }
    
        labels := pod.GetLabels()
        if checkPodInfo(labels) != true {
            return
        }
        logrus.Infof("enter cgs add informer.")
    
        containerNum := len(pod.Status.ContainerStatuses)
        for i := 0; i < containerNum; i++ {
            if pod.Status.ContainerStatuses[i].Name != "datahandle" {
                continue
            }
            if pod.Status.ContainerStatuses[i].Ready {
                logrus.Infof("add pod: %s to podNameMap ", pod.GetName())
                podNameMap.Store(pod.GetName(), struct{}{})
            } else {
                logrus.Infof("Please wait a minute. pod:%s is not ready")
            }
        }
    }
    //删除事件的处理
    func informerDelete(obj interface{}) {
        pod, ok := obj.(*corev1.Pod)
        if ok != true {
            return
        }
    
        labels := pod.GetLabels()
        if checkPodInfo(labels) != true {
            return
        }
    
        logrus.Infof("delete pod:%s from podNameMap", pod.GetName())
        podNameMap.Delete(pod.GetName())
    }
    
    func isUpdateCgsDatahandle(oldPod *corev1.Pod, newPod *corev1.Pod) bool {
        oldLabels := oldPod.GetLabels()
        newLabels := newPod.GetLabels()
        return checkPodInfo(oldLabels) && checkPodInfo(newLabels)
    }
    // 因为是statefulset的pod,所以会有索引
    func getDataServiceIndex(pod *corev1.Pod) (int, error) {
        containerNum := len(pod.Status.ContainerStatuses)
        for i := 0; i < containerNum; i++ {
            if pod.Status.ContainerStatuses[i].Name == "datahandle" {
                return i, nil
            }
        }
        return -1, errors.New("can not find datahandle Container")
    }
    func walkSyncMap(key, _ interface{}) bool {
        logrus.Infoln("Key =", key)
        return true
    }
    func informerUpdate(oldObj interface{}, newObj interface{}) {
        oldPod, ok := oldObj.(*corev1.Pod)
        if ok != true {
            return
        }
    
        newPod, ok := newObj.(*corev1.Pod)
        if ok != true {
            return
        }
        if false == isUpdateCgsDatahandle(oldPod, newPod) {
            return
        }
    
        oldIndex, err := getDataServiceIndex(oldPod)
        if err != nil {
            return
        }
        newIndex, err := getDataServiceIndex(newPod)
        if err != nil {
            return
        }
    
        oldReady := oldPod.Status.ContainerStatuses[oldIndex].Ready
        newReady := newPod.Status.ContainerStatuses[newIndex].Ready
        if oldReady != newReady {
            if true == newReady {
                logrus.Infof("update pod:%s ,Ready status is %v ,add to podNameMap:", newPod.GetName(), newReady)
                podNameMap.Store(newPod.GetName(), struct{}{})
                podNameMap.Range(walkSyncMap)
            } else {
                logrus.Infof("update pod:%s ,Ready status is %v ,delete from podNameMap:", newPod.GetName(), newReady)
                podNameMap.Delete(newPod.GetName())
                podNameMap.Range(walkSyncMap)
            }
        }
    }
    //informers 的核心代码
    func initInformer(clientset *kubernetes.Clientset) {
        factory := informers.NewSharedInformerFactory(clientset, time.Minute*3)
        podInformer := factory.Core().V1().Pods()
        podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    informerAdd,
            DeleteFunc: informerDelete,
            UpdateFunc: informerUpdate,
        })
        factory.Start(wait.NeverStop)
        factory.WaitForCacheSync(wait.NeverStop)
    }
    
    func init() {
        parseCmd()
        config, err := clientcmd.BuildConfigFromFlags("", cmd.kubeConfig)
        if err != nil {
            panic(err)
        }
        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
            panic(err)
        }
        initInformer(clientset)
    }
    
    func main() {
        logrus.Infof("cmd:%#v", cmd)
        err := http.ListenAndServe(":"+strconv.Itoa(cmd.localPort), &HandleServer{})
        if err != nil {
            logrus.Fatalln("ListenAndServe: ", err)
        }
    }
    
    
    

    最后说一下,实际的背景

    为了提高高效存储效率,k8s中没有使用共享存储,高效存储作为三级存储的第一级,需要保持存储的;,在高效存储节点上运行一个或者多个特定的podk8s 自定义调度器 实现,同时需要感知pod的状态,根据状态确定是否从当前pod 对应节点中读取数据,还是从后几级存储中读取,哪个pod无所谓。这个时候需要知道实际工作的pod 状态。

    后记

    上述代码是需要运行在 k8s 里面的,而在k8s里面,直接读取 kubectl的配置文件并不是一个友好的做法;实际上k8s 系统中已经把token和ca证书挂在到了“/var/run/secrets/kubernetes.io/serviceaccount/”目录下 ,lient-go 里面有一个包"k8s.io/client-go/rest"对内部证书和token的读取进行了封装。

        config, err := rest.InClusterConfig()
        if err != nil {
            config, err = clientcmd.BuildConfigFromFlags("", cmd.kubeConfig)
            if err != nil {
                panic(err)
            }
        }
    

    如果在k8s内部就使用k8s内部的CA和Token,否则使用外部的配置文件
    InClusterConfig()的原型如下

    func InClusterConfig() (*Config, error) {
        const (
            tokenFile  = "/var/run/secrets/kubernetes.io/serviceaccount/token"
            rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
        )
        host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
        if len(host) == 0 || len(port) == 0 {
            return nil, ErrNotInCluster
        }
    
        token, err := ioutil.ReadFile(tokenFile)
        if err != nil {
            return nil, err
        }
    
        tlsClientConfig := TLSClientConfig{}
    
        if _, err := certutil.NewPool(rootCAFile); err != nil {
            klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
        } else {
            tlsClientConfig.CAFile = rootCAFile
        }
    
        return &Config{
            // TODO: switch to using cluster DNS.
            Host:            "https://" + net.JoinHostPort(host, port),
            TLSClientConfig: tlsClientConfig,
            BearerToken:     string(token),
            BearerTokenFile: tokenFile,
        }, nil
    }
    

    相关文章

      网友评论

          本文标题:k8s informers 开发

          本文链接:https://www.haomeiwen.com/subject/xnoioltx.html