在 makePodSourceConfig 中,Pod资源以三种方式获得:
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
cfg.Channel 方法会返回一个 chennel,并有一个 goroutine 监听在上面,如果有内容写入到这个 chan 里面,则会触发 Merge 方法写入到 cfg.updates里面。
第一种是通过文件获得,文件一般放在/etc/kubernetes/manifests目录下面,启动kubelet时可以通过指定–config覆盖;
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
第二种也是通过文件过得,只不过文件是通过URL获取的,URL可以在启动kubelet时通过ManifestURL指定;
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
第三种是通过watch kube-apiserver获取;
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
1. NewSourceFile 方法在 /kubelet/config/file.go
使用 “golang.org/x/exp/inotify” 来watch文件的改变,然后通过 extractFromFile 读取变化内容,把更新内容返回给 updates
2. NewSourceURL 方法在 /kubelet/config/http.go
TODO
3. NewSourceApiserver 方法在 /kubelet/config/apiserver.go
简单的理解这个方法,就是监控 apiserver 上 pod 的变化,将变化写到 PodConfig.updates 这个 channel 里面。
1. 调用NewListWatchFromClient方法;
2. 调用newSourceApiserverFromLW方法;
看一下 NewListWatchFromClient 在 /client-go/tools/cache/listwatch.go
1. NewListWatchFromClient方法将返回一个ListWatch结构体,定义listFunc和watchFunc;
2. listFunc用于List,watchFunc用于Watch;
* listFunc首先Get()方法,说明发起一个Get请求,再看看Namespace(namespace),其实只是在request设置namespace字段,再看看Resource函数,设置request的resource字段,VersionedParams主要对options进行序列化,options主要包括ResourceVersion和TimeoutSeconds这两个参数,FieldsSelectorParam函数主要将filter函数进行序列化,深入分析这个函数,你可以发现其实他将他序列化到一个嵌套的map里面。Do()函数发起真正的请求,并收到response,然后用r.transformResponse去处理response,包装成Result返回。Get()方法则主要对Result进行反序列化。最后返回结果;
* Watch()方法将返回watch.Interface,这个watch.Interface专门用来传送kubelet想要watch的资源。Watch首先会发起一个request,然后反序列化response,从response中获得watch.Interface;(具体返回是StreamWatcher对象) 获得watcher以后,reflector会调用r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)去处理这个watcher;
3. ListWatch会传入newSourceApiserverFromLW方法;
下面是 newSourceApiserverFromLW, 在 /kubelet/config/apiserver.go
1. 首先创建了一个 send 函数,它将从apiserver获取的pods传送到updates channel中;
2. 通过构建一个reflector,然后run 从apiserver 获得Pods信息;
接下来看一下 Reflector 是怎么获取Pods信息的。k8s.io/client-go/tools/cache/reflector.go
Reflector 对象,主要数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();在此之前 NewListWatchFromClient 方法返回的 ListWatch 对象将作为 Reflector 的ListerWatcher 数据成员;
Refector 的 Run 方法,启动协程执行执行reflector的 ListAndWatch 方法;
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
在 ListAndWatch 方法中
1. 调用ListFunc和WatchFunc去List和Watch apiserver的资源;
2. 调用reflector的watchHandler方法;
watchHandler 方法
1. 从channel读取event,然后更新到 r.store;
2. r.store 就是在创建的时候,传递的参数 cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc) 生成的 UndeltaStore;
3. r.store 在操作的时候,执行 u.PushFunc(u.Store.List()) 操作,PushFunc 函数是之前注册的 send 方法 (此send方法是在pkg/kubelet/config/apiserver.go的NewSourceApiserverFromLW中定义),它将从apiserver获取的pods传送到updates channel中,kubelet从updates channel获取到Pod信息进行处理;
4. 这里无论是add、delete或者modify, u.PushFunc(u.Store.List()) 他会发送存储的所有pods。因为在这些操作之前,它都会先操作Store里面的pods对象,确保Store里面存储的是分配到该节点的Pod的最新信息;
网友评论