深入分析kubelet(1)——获取Pod创建请求
kubelet组件巨复杂,所以将分成几个部分分析。
职责猜想
kubelet主要干以下工作:
- 创建Pod
- 管理Pod
- 上报Node信息
- GC
获取请求
创建Pod首先需要获取Pod创建请求,本文主要分析这个模块。与其他K8S组件不同,kubelet代码风格独树一帜,代码复杂度也高了几个档次,我也是经过一天的梳理之后,才找到入口。
总的来说,kubelet可以通过三种方式创建Pod。分别是StaticPodPath,StaticPodURL和apiserver,前两个都是用来创建static pod,已这三个为切入点,分析kubelet。
StaticPod
简单来说,不是通过apiserver创建的Pod,都是static pod。应用场景就是kubeadm,除了kubelet以外的组件,都是static pod,因为当时apiserver还没起,所以只能用static pod创建。
创建static pod有两种方式,配置文件和HTTP。详见static pod
staticPodPath is the path to the directory containing local (static) pods to run, or the path to a single static pod file.
staticPodURL is the URL for accessing static pods to run
code
k8s.io\kubernetes\pkg\kubelet\config\config.go
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
pods *podStorage
mux *config.Mux
// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
checkpointManager checkpointmanager.CheckpointManager
}
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}
在PodConfig中,我们需要重点关注pods和updates。其中updates是一个生产者消费者channel,所有的修改都必须通过它派发出去。所以大胆揣测,所有的Pod请求都发给updates,然后会有workers一直处理updates。
k8s.io\kubernetes\pkg\kubelet\kubelet.go
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.StaticPodPath != "" {
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
var updatechannel chan<- interface{}
if kubeDeps.KubeClient != nil {
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
return cfg, nil
}
从makePodSourceConfig函数我们可以清晰看到,生产者有三个。这里我们只分析NewSourceApiserver,其他的举一反三就好。
k8s.io\kubernetes\pkg\kubelet\config\apiserver.go
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
这块还比较简单,就是通过listWatchPod,过滤条件是nodeName,这里就和之前scheduler结合起来了,并且调用send函数将Pods发给updates。如果对listWatch不太清楚,可以参考之前的ListAndWatch。
k8s.io\client-go\tools\cache\undelta_store.go
func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Update(obj interface{}) error {
if err := u.Store.Update(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Delete(obj interface{}) error {
if err := u.Store.Delete(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
UndeltaStore很有意思,每次有变更,都会push全量数据,这里为什么用,还不太清楚。
从上面源代码看到所有的Pods都会push到updates里面,而这个updates并非PodConfg里面的,这里有必要看一下。
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sources.Insert(source)
return c.mux.Channel(source)
}
func (m *Mux) Channel(source string) chan interface{} {
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if len(restores.Pods) > 0 {
s.updates <- *restores
}
}
return nil
}
看似简单的Channel方法,其实干了很多事情,它将listWatch的全量数据与PodStorage做比对,得到对应操作的Pod集合。
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
而PodStorage的updates就是PodConfig的updates,至此整个生产者过程终于理清楚了。








网友评论