深入分析kubelet(1)——获取Pod创建请求
kubelet组件巨复杂,所以将分成几个部分分析。
职责猜想
kubelet主要干以下工作:
- 创建Pod
- 管理Pod
- 上报Node信息
- GC
获取请求
创建Pod首先需要获取Pod创建请求,本文主要分析这个模块。与其他K8S组件不同,kubelet代码风格独树一帜,代码复杂度也高了几个档次,我也是经过一天的梳理之后,才找到入口。
总的来说,kubelet可以通过三种方式创建Pod。分别是StaticPodPath
,StaticPodURL
和apiserver
,前两个都是用来创建static pod
,已这三个为切入点,分析kubelet。
StaticPod
简单来说,不是通过apiserver
创建的Pod,都是static pod
。应用场景就是kubeadm
,除了kubelet
以外的组件,都是static pod
,因为当时apiserver
还没起,所以只能用static pod
创建。
创建static pod
有两种方式,配置文件和HTTP。详见static pod
staticPodPath is the path to the directory containing local (static) pods to run, or the path to a single static pod file.
staticPodURL is the URL for accessing static pods to run
code
k8s.io\kubernetes\pkg\kubelet\config\config.go
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
pods *podStorage
mux *config.Mux
// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
checkpointManager checkpointmanager.CheckpointManager
}
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}
在PodConfig
中,我们需要重点关注pods
和updates
。其中updates
是一个生产者消费者channel
,所有的修改都必须通过它派发出去。所以大胆揣测,所有的Pod请求都发给updates
,然后会有workers
一直处理updates
。
k8s.io\kubernetes\pkg\kubelet\kubelet.go
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.StaticPodPath != "" {
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
var updatechannel chan<- interface{}
if kubeDeps.KubeClient != nil {
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
return cfg, nil
}
从makePodSourceConfig
函数我们可以清晰看到,生产者有三个。这里我们只分析NewSourceApiserver
,其他的举一反三就好。
k8s.io\kubernetes\pkg\kubelet\config\apiserver.go
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
这块还比较简单,就是通过listWatch
Pod,过滤条件是nodeName
,这里就和之前scheduler结合起来了,并且调用send
函数将Pods发给updates
。如果对listWatch
不太清楚,可以参考之前的ListAndWatch。
k8s.io\client-go\tools\cache\undelta_store.go
func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Update(obj interface{}) error {
if err := u.Store.Update(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Delete(obj interface{}) error {
if err := u.Store.Delete(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
UndeltaStore
很有意思,每次有变更,都会push全量数据,这里为什么用,还不太清楚。
从上面源代码看到所有的Pods都会push到updates
里面,而这个updates
并非PodConfg里面的,这里有必要看一下。
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sources.Insert(source)
return c.mux.Channel(source)
}
func (m *Mux) Channel(source string) chan interface{} {
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if len(restores.Pods) > 0 {
s.updates <- *restores
}
}
return nil
}
看似简单的Channel
方法,其实干了很多事情,它将listWatch
的全量数据与PodStorage
做比对,得到对应操作的Pod集合。
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
而PodStorage
的updates
就是PodConfig
的updates
,至此整个生产者过程终于理清楚了。
网友评论