PodConfig 的监控

作者: shinwing | 来源:发表于2018-05-11 09:47 被阅读0次

    在 makePodSourceConfig 中,Pod资源以三种方式获得:

    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

    cfg.Channel 方法会返回一个 chennel,并有一个 goroutine 监听在上面,如果有内容写入到这个 chan 里面,则会触发 Merge 方法写入到 cfg.updates里面。

    第一种是通过文件获得,文件一般放在/etc/kubernetes/manifests目录下面,启动kubelet时可以通过指定–config覆盖;

    config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))

    第二种也是通过文件过得,只不过文件是通过URL获取的,URL可以在启动kubelet时通过ManifestURL指定;

    config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))

    第三种是通过watch kube-apiserver获取;

    config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)

    1. NewSourceFile 方法在 /kubelet/config/file.go

    使用 “golang.org/x/exp/inotify” 来watch文件的改变,然后通过 extractFromFile 读取变化内容,把更新内容返回给 updates

    2. NewSourceURL 方法在 /kubelet/config/http.go

    TODO

    3. NewSourceApiserver 方法在 /kubelet/config/apiserver.go

    简单的理解这个方法,就是监控 apiserver 上 pod 的变化,将变化写到 PodConfig.updates 这个 channel 里面。

    1. 调用NewListWatchFromClient方法;

    2. 调用newSourceApiserverFromLW方法;

    看一下 NewListWatchFromClient 在 /client-go/tools/cache/listwatch.go

    1. NewListWatchFromClient方法将返回一个ListWatch结构体,定义listFunc和watchFunc;

    2. listFunc用于List,watchFunc用于Watch;

          *  listFunc首先Get()方法,说明发起一个Get请求,再看看Namespace(namespace),其实只是在request设置namespace字段,再看看Resource函数,设置request的resource字段,VersionedParams主要对options进行序列化,options主要包括ResourceVersion和TimeoutSeconds这两个参数,FieldsSelectorParam函数主要将filter函数进行序列化,深入分析这个函数,你可以发现其实他将他序列化到一个嵌套的map里面。Do()函数发起真正的请求,并收到response,然后用r.transformResponse去处理response,包装成Result返回。Get()方法则主要对Result进行反序列化。最后返回结果;

          *  Watch()方法将返回watch.Interface,这个watch.Interface专门用来传送kubelet想要watch的资源。Watch首先会发起一个request,然后反序列化response,从response中获得watch.Interface;(具体返回是StreamWatcher对象)   获得watcher以后,reflector会调用r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)去处理这个watcher;

    3. ListWatch会传入newSourceApiserverFromLW方法;

    下面是 newSourceApiserverFromLW, 在 /kubelet/config/apiserver.go

    1. 首先创建了一个 send 函数,它将从apiserver获取的pods传送到updates channel中;

    2. 通过构建一个reflector,然后run 从apiserver 获得Pods信息;

    接下来看一下 Reflector 是怎么获取Pods信息的。k8s.io/client-go/tools/cache/reflector.go

    Reflector 对象,主要数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();在此之前 NewListWatchFromClient 方法返回的 ListWatch 对象将作为 Reflector 的ListerWatcher 数据成员;

    Refector 的 Run 方法,启动协程执行执行reflector的 ListAndWatch 方法;

    wait.Until(func() {

            if err := r.ListAndWatch(stopCh); err != nil {

                    utilruntime.HandleError(err)

            }

    }, r.period, stopCh)

    在 ListAndWatch  方法中

    1. 调用ListFunc和WatchFunc去List和Watch apiserver的资源;

    2. 调用reflector的watchHandler方法;

    watchHandler 方法

    1. 从channel读取event,然后更新到 r.store;

    2.  r.store 就是在创建的时候,传递的参数 cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc) 生成的 UndeltaStore;

    3.  r.store  在操作的时候,执行 u.PushFunc(u.Store.List()) 操作,PushFunc 函数是之前注册的 send 方法 (此send方法是在pkg/kubelet/config/apiserver.go的NewSourceApiserverFromLW中定义),它将从apiserver获取的pods传送到updates channel中,kubelet从updates channel获取到Pod信息进行处理;

    4. 这里无论是add、delete或者modify, u.PushFunc(u.Store.List()) 他会发送存储的所有pods。因为在这些操作之前,它都会先操作Store里面的pods对象,确保Store里面存储的是分配到该节点的Pod的最新信息;

    相关文章

      网友评论

        本文标题:PodConfig 的监控

        本文链接:https://www.haomeiwen.com/subject/ksnxdftx.html