为什么需要informer
关于K8S的设计理念,耳熟能详的概念有很多,比如
- 声明式API
- 最终一致性
- 水平触发
- ...
这些概念直白一点理解就是: API只是描述了我期待的样子(声明式API),系统(这里是指controller)保证我最终实现的样子和期待的样子保持一致(最终一致性),整个过程体现的就是水平触发。
虽然不是很准确,但是就是那么回事。
简单拓展一下:
声明式API的调用返回是在写入ETCD之后,就返回了,不负责最终实现。
最终一致性需要监控资源的变化(List&Watch),并将变化通知到(informers)对应的controller,controller接受变化信息后,做出相应的动作。
水平触发=电平触发 = 状态触发 ,通过状态采样,通知到对方实际状态,通常会通知多次;相对应的是边沿触发=边缘触发只是在改变的时候触发只会通知一次,错过了就错过了。
水平触发.png
一个具体实例
这里实现的是一个反向代理statefulset的pod,pod 的状态异常时候能及时调整反向代理的后端服务IP,之所以要识别到系统的pod ip 。
其中的关键函数是initInformer,它注册了系统的通知函数,里面处理想要关注的资源的添加,更新,删除。
分别实现这三个函数,就能动态获取后台状态正常的 pod
package main
import (
"errors"
"flag"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"sync"
"time"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
type HandleServer struct {
}
type Cmd struct {
localPort int
serviceName string
serverPort int
kubeConfig string
}
//这里存放statefulset 状态为正常的Pod,涉及协程直接的同步,所以这里使用了sync.Map
var podNameMap sync.Map
var cmd Cmd
func parseCmd() {
flag.IntVar(&cmd.localPort, "listenPort", 8888, "listen on local port")
flag.StringVar(&cmd.serviceName, "svc", "datahandle", "service name")
flag.IntVar(&cmd.serverPort, "reversePort", 8086, "reverse server proxy port")
flag.StringVar(&cmd.kubeConfig, "kubeconfig", "~/.kube/config", "kubeconfig path")
flag.Parse()
}
//反向代理的服务器
func (this *HandleServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
svcName := cmd.serviceName
podName := r.URL.Query().Get("LocalFileHost")
if "" != podName {
logrus.Infoln(podName)
_, ok := podNameMap.Load(podName)
if ok {
svcName = podName + "." + svcName
}
}
svc, err := url.Parse("http://" + svcName + ":" + strconv.Itoa(cmd.serverPort))
if err != nil {
logrus.Error(err)
}
proxy := httputil.NewSingleHostReverseProxy(svc)
proxy.ServeHTTP(w, r)
}
//过滤是否为需要处理的pod
func checkPodInfo(labels map[string]string) bool {
releaseName, ok := labels["release"]
if !ok {
return false
}
if releaseName != "cgs" {
return false
}
return true
}
//pod添加事件,处理,启动的时候会触发一次
func informerAdd(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok != true {
return
}
labels := pod.GetLabels()
if checkPodInfo(labels) != true {
return
}
logrus.Infof("enter cgs add informer.")
containerNum := len(pod.Status.ContainerStatuses)
for i := 0; i < containerNum; i++ {
if pod.Status.ContainerStatuses[i].Name != "datahandle" {
continue
}
if pod.Status.ContainerStatuses[i].Ready {
logrus.Infof("add pod: %s to podNameMap ", pod.GetName())
podNameMap.Store(pod.GetName(), struct{}{})
} else {
logrus.Infof("Please wait a minute. pod:%s is not ready")
}
}
}
//删除事件的处理
func informerDelete(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok != true {
return
}
labels := pod.GetLabels()
if checkPodInfo(labels) != true {
return
}
logrus.Infof("delete pod:%s from podNameMap", pod.GetName())
podNameMap.Delete(pod.GetName())
}
func isUpdateCgsDatahandle(oldPod *corev1.Pod, newPod *corev1.Pod) bool {
oldLabels := oldPod.GetLabels()
newLabels := newPod.GetLabels()
return checkPodInfo(oldLabels) && checkPodInfo(newLabels)
}
// 因为是statefulset的pod,所以会有索引
func getDataServiceIndex(pod *corev1.Pod) (int, error) {
containerNum := len(pod.Status.ContainerStatuses)
for i := 0; i < containerNum; i++ {
if pod.Status.ContainerStatuses[i].Name == "datahandle" {
return i, nil
}
}
return -1, errors.New("can not find datahandle Container")
}
func walkSyncMap(key, _ interface{}) bool {
logrus.Infoln("Key =", key)
return true
}
func informerUpdate(oldObj interface{}, newObj interface{}) {
oldPod, ok := oldObj.(*corev1.Pod)
if ok != true {
return
}
newPod, ok := newObj.(*corev1.Pod)
if ok != true {
return
}
if false == isUpdateCgsDatahandle(oldPod, newPod) {
return
}
oldIndex, err := getDataServiceIndex(oldPod)
if err != nil {
return
}
newIndex, err := getDataServiceIndex(newPod)
if err != nil {
return
}
oldReady := oldPod.Status.ContainerStatuses[oldIndex].Ready
newReady := newPod.Status.ContainerStatuses[newIndex].Ready
if oldReady != newReady {
if true == newReady {
logrus.Infof("update pod:%s ,Ready status is %v ,add to podNameMap:", newPod.GetName(), newReady)
podNameMap.Store(newPod.GetName(), struct{}{})
podNameMap.Range(walkSyncMap)
} else {
logrus.Infof("update pod:%s ,Ready status is %v ,delete from podNameMap:", newPod.GetName(), newReady)
podNameMap.Delete(newPod.GetName())
podNameMap.Range(walkSyncMap)
}
}
}
//informers 的核心代码
func initInformer(clientset *kubernetes.Clientset) {
factory := informers.NewSharedInformerFactory(clientset, time.Minute*3)
podInformer := factory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: informerAdd,
DeleteFunc: informerDelete,
UpdateFunc: informerUpdate,
})
factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)
}
func init() {
parseCmd()
config, err := clientcmd.BuildConfigFromFlags("", cmd.kubeConfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
initInformer(clientset)
}
func main() {
logrus.Infof("cmd:%#v", cmd)
err := http.ListenAndServe(":"+strconv.Itoa(cmd.localPort), &HandleServer{})
if err != nil {
logrus.Fatalln("ListenAndServe: ", err)
}
}
最后说一下,实际的背景
为了提高高效存储效率,k8s中没有使用共享存储,高效存储作为三级存储的第一级,需要保持存储的;,在高效存储节点上运行一个或者多个特定的podk8s 自定义调度器 实现,同时需要感知pod的状态,根据状态确定是否从当前pod 对应节点中读取数据,还是从后几级存储中读取,哪个pod无所谓。这个时候需要知道实际工作的pod 状态。
后记
上述代码是需要运行在 k8s 里面的,而在k8s里面,直接读取 kubectl的配置文件并不是一个友好的做法;实际上k8s 系统中已经把token和ca证书挂在到了“/var/run/secrets/kubernetes.io/serviceaccount/”目录下 ,lient-go 里面有一个包"k8s.io/client-go/rest"对内部证书和token的读取进行了封装。
config, err := rest.InClusterConfig()
if err != nil {
config, err = clientcmd.BuildConfigFromFlags("", cmd.kubeConfig)
if err != nil {
panic(err)
}
}
如果在k8s内部就使用k8s内部的CA和Token,否则使用外部的配置文件
InClusterConfig()的原型如下
func InClusterConfig() (*Config, error) {
const (
tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, ErrNotInCluster
}
token, err := ioutil.ReadFile(tokenFile)
if err != nil {
return nil, err
}
tlsClientConfig := TLSClientConfig{}
if _, err := certutil.NewPool(rootCAFile); err != nil {
klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
} else {
tlsClientConfig.CAFile = rootCAFile
}
return &Config{
// TODO: switch to using cluster DNS.
Host: "https://" + net.JoinHostPort(host, port),
TLSClientConfig: tlsClientConfig,
BearerToken: string(token),
BearerTokenFile: tokenFile,
}, nil
}
网友评论