美文网首页
深入分析kubelet(1)——获取Pod创建请求

深入分析kubelet(1)——获取Pod创建请求

作者: 陈先生_9e91 | 来源:发表于2018-09-28 15:06 被阅读0次

深入分析kubelet(1)——获取Pod创建请求

kubelet组件巨复杂,所以将分成几个部分分析。

职责猜想

kubelet主要干以下工作:

  1. 创建Pod
  2. 管理Pod
  3. 上报Node信息
  4. GC

获取请求

创建Pod首先需要获取Pod创建请求,本文主要分析这个模块。与其他K8S组件不同,kubelet代码风格独树一帜,代码复杂度也高了几个档次,我也是经过一天的梳理之后,才找到入口。

总的来说,kubelet可以通过三种方式创建Pod。分别是StaticPodPathStaticPodURLapiserver,前两个都是用来创建static pod,已这三个为切入点,分析kubelet。

StaticPod

简单来说,不是通过apiserver创建的Pod,都是static pod。应用场景就是kubeadm,除了kubelet以外的组件,都是static pod,因为当时apiserver还没起,所以只能用static pod创建。

创建static pod有两种方式,配置文件和HTTP。详见static pod

staticPodPath is the path to the directory containing local (static) pods to run, or the path to a single static pod file.
staticPodURL is the URL for accessing static pods to run

code

k8s.io\kubernetes\pkg\kubelet\config\config.go

// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
    pods *podStorage
    mux  *config.Mux

    // the channel of denormalized changes passed to listeners
    updates chan kubetypes.PodUpdate

    // contains the list of all configured sources
    sourcesLock       sync.Mutex
    sources           sets.String
    checkpointManager checkpointmanager.CheckpointManager
}

// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
type PodUpdate struct {
    Pods   []*v1.Pod
    Op     PodOperation
    Source string
}

PodConfig中,我们需要重点关注podsupdates。其中updates是一个生产者消费者channel,所有的修改都必须通过它派发出去。所以大胆揣测,所有的Pod请求都发给updates,然后会有workers一直处理updates

k8s.io\kubernetes\pkg\kubelet\kubelet.go

// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

    // define file config source
    if kubeCfg.StaticPodPath != "" {
        config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
    }

    // define url config source
    if kubeCfg.StaticPodURL != "" {
        config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
    }
    
    var updatechannel chan<- interface{}
    if kubeDeps.KubeClient != nil {
        if updatechannel == nil {
            updatechannel = cfg.Channel(kubetypes.ApiserverSource)
        }
        config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
    }
    return cfg, nil
}

makePodSourceConfig函数我们可以清晰看到,生产者有三个。这里我们只分析NewSourceApiserver,其他的举一反三就好。

k8s.io\kubernetes\pkg\kubelet\config\apiserver.go

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
    send := func(objs []interface{}) {
        var pods []*v1.Pod
        for _, o := range objs {
            pods = append(pods, o.(*v1.Pod))
        }
        updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
    }
    r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
    go r.Run(wait.NeverStop)
}

这块还比较简单,就是通过listWatchPod,过滤条件是nodeName,这里就和之前scheduler结合起来了,并且调用send函数将Pods发给updates。如果对listWatch不太清楚,可以参考之前的ListAndWatch

k8s.io\client-go\tools\cache\undelta_store.go

func (u *UndeltaStore) Add(obj interface{}) error {
    if err := u.Store.Add(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}

func (u *UndeltaStore) Update(obj interface{}) error {
    if err := u.Store.Update(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}

func (u *UndeltaStore) Delete(obj interface{}) error {
    if err := u.Store.Delete(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}

UndeltaStore很有意思,每次有变更,都会push全量数据,这里为什么用,还不太清楚。

从上面源代码看到所有的Pods都会push到updates里面,而这个updates并非PodConfg里面的,这里有必要看一下。

// Channel creates or returns a config source channel.  The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
    c.sources.Insert(source)
    return c.mux.Channel(source)
}

func (m *Mux) Channel(source string) chan interface{} {
    go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
    return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
    for update := range listenChannel {
        m.merger.Merge(source, update)
    }
}

// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel.  Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
    adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
    
    // deliver update notifications
    switch s.mode {
    case PodConfigNotificationIncremental:
        if len(removes.Pods) > 0 {
            s.updates <- *removes
        }
        if len(adds.Pods) > 0 {
            s.updates <- *adds
        }
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        }
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        }
        if len(restores.Pods) > 0 {
            s.updates <- *restores
        }
    }

    return nil
}

看似简单的Channel方法,其实干了很多事情,它将listWatch的全量数据与PodStorage做比对,得到对应操作的Pod集合。

// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
    updates := make(chan kubetypes.PodUpdate, 50)
    storage := newPodStorage(updates, mode, recorder)
    podConfig := &PodConfig{
        pods:    storage,
        mux:     config.NewMux(storage),
        updates: updates,
        sources: sets.String{},
    }
    return podConfig
}

PodStorageupdates就是PodConfigupdates,至此整个生产者过程终于理清楚了。

相关文章

网友评论

      本文标题:深入分析kubelet(1)——获取Pod创建请求

      本文链接:https://www.haomeiwen.com/subject/ferpoftx.html