美文网首页
k8s informers 开发

k8s informers 开发

作者: 郭青耀 | 来源:发表于2021-10-18 23:13 被阅读0次

为什么需要informer

关于K8S的设计理念,耳熟能详的概念有很多,比如

  • 声明式API
  • 最终一致性
  • 水平触发
  • ...

这些概念直白一点理解就是: API只是描述了我期待的样子(声明式API),系统(这里是指controller)保证我最终实现的样子和期待的样子保持一致(最终一致性),整个过程体现的就是水平触发。

虽然不是很准确,但是就是那么回事。

简单拓展一下:
声明式API的调用返回是在写入ETCD之后,就返回了,不负责最终实现。
最终一致性需要监控资源的变化(List&Watch),并将变化通知到(informers)对应的controller,controller接受变化信息后,做出相应的动作。
水平触发=电平触发 = 状态触发 ,通过状态采样,通知到对方实际状态,通常会通知多次;相对应的是边沿触发=边缘触发只是在改变的时候触发只会通知一次,错过了就错过了。


水平触发.png

一个具体实例

这里实现的是一个反向代理statefulset的pod,pod 的状态异常时候能及时调整反向代理的后端服务IP,之所以要识别到系统的pod ip 。
其中的关键函数是initInformer,它注册了系统的通知函数,里面处理想要关注的资源的添加,更新,删除。
分别实现这三个函数,就能动态获取后台状态正常的 pod

package main

import (
    "errors"
    "flag"
    "net/http"
    "net/http/httputil"
    "net/url"
    "strconv"
    "sync"
    "time"

    "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

type HandleServer struct {
}

type Cmd struct {
    localPort   int
    serviceName string
    serverPort  int
    kubeConfig  string
}
//这里存放statefulset 状态为正常的Pod,涉及协程直接的同步,所以这里使用了sync.Map
var podNameMap sync.Map
var cmd Cmd

func parseCmd() {
    flag.IntVar(&cmd.localPort, "listenPort", 8888, "listen on local port")
    flag.StringVar(&cmd.serviceName, "svc", "datahandle", "service name")
    flag.IntVar(&cmd.serverPort, "reversePort", 8086, "reverse server proxy port")
    flag.StringVar(&cmd.kubeConfig, "kubeconfig", "~/.kube/config", "kubeconfig path")
    flag.Parse()
}
//反向代理的服务器
func (this *HandleServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    svcName := cmd.serviceName
    podName := r.URL.Query().Get("LocalFileHost")
    if "" != podName {
        logrus.Infoln(podName)
        _, ok := podNameMap.Load(podName)
        if ok {
            svcName = podName + "." + svcName
        }
    }
    svc, err := url.Parse("http://" + svcName + ":" + strconv.Itoa(cmd.serverPort))
    if err != nil {
        logrus.Error(err)
    }
    proxy := httputil.NewSingleHostReverseProxy(svc)
    proxy.ServeHTTP(w, r)
}
//过滤是否为需要处理的pod
func checkPodInfo(labels map[string]string) bool {
    releaseName, ok := labels["release"]
    if !ok {
        return false
    }
    if releaseName != "cgs" {
        return false
    }

    return true
}

//pod添加事件,处理,启动的时候会触发一次
func informerAdd(obj interface{}) {
    pod, ok := obj.(*corev1.Pod)
    if ok != true {
        return
    }

    labels := pod.GetLabels()
    if checkPodInfo(labels) != true {
        return
    }
    logrus.Infof("enter cgs add informer.")

    containerNum := len(pod.Status.ContainerStatuses)
    for i := 0; i < containerNum; i++ {
        if pod.Status.ContainerStatuses[i].Name != "datahandle" {
            continue
        }
        if pod.Status.ContainerStatuses[i].Ready {
            logrus.Infof("add pod: %s to podNameMap ", pod.GetName())
            podNameMap.Store(pod.GetName(), struct{}{})
        } else {
            logrus.Infof("Please wait a minute. pod:%s is not ready")
        }
    }
}
//删除事件的处理
func informerDelete(obj interface{}) {
    pod, ok := obj.(*corev1.Pod)
    if ok != true {
        return
    }

    labels := pod.GetLabels()
    if checkPodInfo(labels) != true {
        return
    }

    logrus.Infof("delete pod:%s from podNameMap", pod.GetName())
    podNameMap.Delete(pod.GetName())
}

func isUpdateCgsDatahandle(oldPod *corev1.Pod, newPod *corev1.Pod) bool {
    oldLabels := oldPod.GetLabels()
    newLabels := newPod.GetLabels()
    return checkPodInfo(oldLabels) && checkPodInfo(newLabels)
}
// 因为是statefulset的pod,所以会有索引
func getDataServiceIndex(pod *corev1.Pod) (int, error) {
    containerNum := len(pod.Status.ContainerStatuses)
    for i := 0; i < containerNum; i++ {
        if pod.Status.ContainerStatuses[i].Name == "datahandle" {
            return i, nil
        }
    }
    return -1, errors.New("can not find datahandle Container")
}
func walkSyncMap(key, _ interface{}) bool {
    logrus.Infoln("Key =", key)
    return true
}
func informerUpdate(oldObj interface{}, newObj interface{}) {
    oldPod, ok := oldObj.(*corev1.Pod)
    if ok != true {
        return
    }

    newPod, ok := newObj.(*corev1.Pod)
    if ok != true {
        return
    }
    if false == isUpdateCgsDatahandle(oldPod, newPod) {
        return
    }

    oldIndex, err := getDataServiceIndex(oldPod)
    if err != nil {
        return
    }
    newIndex, err := getDataServiceIndex(newPod)
    if err != nil {
        return
    }

    oldReady := oldPod.Status.ContainerStatuses[oldIndex].Ready
    newReady := newPod.Status.ContainerStatuses[newIndex].Ready
    if oldReady != newReady {
        if true == newReady {
            logrus.Infof("update pod:%s ,Ready status is %v ,add to podNameMap:", newPod.GetName(), newReady)
            podNameMap.Store(newPod.GetName(), struct{}{})
            podNameMap.Range(walkSyncMap)
        } else {
            logrus.Infof("update pod:%s ,Ready status is %v ,delete from podNameMap:", newPod.GetName(), newReady)
            podNameMap.Delete(newPod.GetName())
            podNameMap.Range(walkSyncMap)
        }
    }
}
//informers 的核心代码
func initInformer(clientset *kubernetes.Clientset) {
    factory := informers.NewSharedInformerFactory(clientset, time.Minute*3)
    podInformer := factory.Core().V1().Pods()
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    informerAdd,
        DeleteFunc: informerDelete,
        UpdateFunc: informerUpdate,
    })
    factory.Start(wait.NeverStop)
    factory.WaitForCacheSync(wait.NeverStop)
}

func init() {
    parseCmd()
    config, err := clientcmd.BuildConfigFromFlags("", cmd.kubeConfig)
    if err != nil {
        panic(err)
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }
    initInformer(clientset)
}

func main() {
    logrus.Infof("cmd:%#v", cmd)
    err := http.ListenAndServe(":"+strconv.Itoa(cmd.localPort), &HandleServer{})
    if err != nil {
        logrus.Fatalln("ListenAndServe: ", err)
    }
}


最后说一下,实际的背景

为了提高高效存储效率,k8s中没有使用共享存储,高效存储作为三级存储的第一级,需要保持存储的;,在高效存储节点上运行一个或者多个特定的podk8s 自定义调度器 实现,同时需要感知pod的状态,根据状态确定是否从当前pod 对应节点中读取数据,还是从后几级存储中读取,哪个pod无所谓。这个时候需要知道实际工作的pod 状态。

后记

上述代码是需要运行在 k8s 里面的,而在k8s里面,直接读取 kubectl的配置文件并不是一个友好的做法;实际上k8s 系统中已经把token和ca证书挂在到了“/var/run/secrets/kubernetes.io/serviceaccount/”目录下 ,lient-go 里面有一个包"k8s.io/client-go/rest"对内部证书和token的读取进行了封装。

    config, err := rest.InClusterConfig()
    if err != nil {
        config, err = clientcmd.BuildConfigFromFlags("", cmd.kubeConfig)
        if err != nil {
            panic(err)
        }
    }

如果在k8s内部就使用k8s内部的CA和Token,否则使用外部的配置文件
InClusterConfig()的原型如下

func InClusterConfig() (*Config, error) {
    const (
        tokenFile  = "/var/run/secrets/kubernetes.io/serviceaccount/token"
        rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
    )
    host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
    if len(host) == 0 || len(port) == 0 {
        return nil, ErrNotInCluster
    }

    token, err := ioutil.ReadFile(tokenFile)
    if err != nil {
        return nil, err
    }

    tlsClientConfig := TLSClientConfig{}

    if _, err := certutil.NewPool(rootCAFile); err != nil {
        klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
    } else {
        tlsClientConfig.CAFile = rootCAFile
    }

    return &Config{
        // TODO: switch to using cluster DNS.
        Host:            "https://" + net.JoinHostPort(host, port),
        TLSClientConfig: tlsClientConfig,
        BearerToken:     string(token),
        BearerTokenFile: tokenFile,
    }, nil
}

相关文章

网友评论

      本文标题:k8s informers 开发

      本文链接:https://www.haomeiwen.com/subject/xnoioltx.html