美文网首页
深入分析kubelet(1)——获取Pod创建请求

深入分析kubelet(1)——获取Pod创建请求

作者: 陈先生_9e91 | 来源:发表于2018-09-28 15:06 被阅读0次

    深入分析kubelet(1)——获取Pod创建请求

    kubelet组件巨复杂,所以将分成几个部分分析。

    职责猜想

    kubelet主要干以下工作:

    1. 创建Pod
    2. 管理Pod
    3. 上报Node信息
    4. GC

    获取请求

    创建Pod首先需要获取Pod创建请求,本文主要分析这个模块。与其他K8S组件不同,kubelet代码风格独树一帜,代码复杂度也高了几个档次,我也是经过一天的梳理之后,才找到入口。

    总的来说,kubelet可以通过三种方式创建Pod。分别是StaticPodPathStaticPodURLapiserver,前两个都是用来创建static pod,已这三个为切入点,分析kubelet。

    StaticPod

    简单来说,不是通过apiserver创建的Pod,都是static pod。应用场景就是kubeadm,除了kubelet以外的组件,都是static pod,因为当时apiserver还没起,所以只能用static pod创建。

    创建static pod有两种方式,配置文件和HTTP。详见static pod

    staticPodPath is the path to the directory containing local (static) pods to run, or the path to a single static pod file.
    staticPodURL is the URL for accessing static pods to run

    code

    k8s.io\kubernetes\pkg\kubelet\config\config.go

    // PodConfig is a configuration mux that merges many sources of pod configuration into a single
    // consistent structure, and then delivers incremental change notifications to listeners
    // in order.
    type PodConfig struct {
        pods *podStorage
        mux  *config.Mux
    
        // the channel of denormalized changes passed to listeners
        updates chan kubetypes.PodUpdate
    
        // contains the list of all configured sources
        sourcesLock       sync.Mutex
        sources           sets.String
        checkpointManager checkpointmanager.CheckpointManager
    }
    
    // PodUpdate defines an operation sent on the channel. You can add or remove single services by
    // sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
    // For setting the state of the system to a given state for this source configuration, set
    // Pods as desired and Op to SET, which will reset the system state to that specified in this
    // operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
    type PodUpdate struct {
        Pods   []*v1.Pod
        Op     PodOperation
        Source string
    }
    

    PodConfig中,我们需要重点关注podsupdates。其中updates是一个生产者消费者channel,所有的修改都必须通过它派发出去。所以大胆揣测,所有的Pod请求都发给updates,然后会有workers一直处理updates

    k8s.io\kubernetes\pkg\kubelet\kubelet.go

    // makePodSourceConfig creates a config.PodConfig from the given
    // KubeletConfiguration or returns an error.
    func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
        cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
    
        // define file config source
        if kubeCfg.StaticPodPath != "" {
            config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
        }
    
        // define url config source
        if kubeCfg.StaticPodURL != "" {
            config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
        }
        
        var updatechannel chan<- interface{}
        if kubeDeps.KubeClient != nil {
            if updatechannel == nil {
                updatechannel = cfg.Channel(kubetypes.ApiserverSource)
            }
            config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
        }
        return cfg, nil
    }
    

    makePodSourceConfig函数我们可以清晰看到,生产者有三个。这里我们只分析NewSourceApiserver,其他的举一反三就好。

    k8s.io\kubernetes\pkg\kubelet\config\apiserver.go

    // NewSourceApiserver creates a config source that watches and pulls from the apiserver.
    func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
        lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
        newSourceApiserverFromLW(lw, updates)
    }
    
    // newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
    func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
        send := func(objs []interface{}) {
            var pods []*v1.Pod
            for _, o := range objs {
                pods = append(pods, o.(*v1.Pod))
            }
            updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
        }
        r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
        go r.Run(wait.NeverStop)
    }
    

    这块还比较简单,就是通过listWatchPod,过滤条件是nodeName,这里就和之前scheduler结合起来了,并且调用send函数将Pods发给updates。如果对listWatch不太清楚,可以参考之前的ListAndWatch

    k8s.io\client-go\tools\cache\undelta_store.go

    func (u *UndeltaStore) Add(obj interface{}) error {
        if err := u.Store.Add(obj); err != nil {
            return err
        }
        u.PushFunc(u.Store.List())
        return nil
    }
    
    func (u *UndeltaStore) Update(obj interface{}) error {
        if err := u.Store.Update(obj); err != nil {
            return err
        }
        u.PushFunc(u.Store.List())
        return nil
    }
    
    func (u *UndeltaStore) Delete(obj interface{}) error {
        if err := u.Store.Delete(obj); err != nil {
            return err
        }
        u.PushFunc(u.Store.List())
        return nil
    }
    
    

    UndeltaStore很有意思,每次有变更,都会push全量数据,这里为什么用,还不太清楚。

    从上面源代码看到所有的Pods都会push到updates里面,而这个updates并非PodConfg里面的,这里有必要看一下。

    // Channel creates or returns a config source channel.  The channel
    // only accepts PodUpdates
    func (c *PodConfig) Channel(source string) chan<- interface{} {
        c.sources.Insert(source)
        return c.mux.Channel(source)
    }
    
    func (m *Mux) Channel(source string) chan interface{} {
        go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
        return newChannel
    }
    
    func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
        for update := range listenChannel {
            m.merger.Merge(source, update)
        }
    }
    
    // Merge normalizes a set of incoming changes from different sources into a map of all Pods
    // and ensures that redundant changes are filtered out, and then pushes zero or more minimal
    // updates onto the update channel.  Ensures that updates are delivered in order.
    func (s *podStorage) Merge(source string, change interface{}) error {
        adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
        
        // deliver update notifications
        switch s.mode {
        case PodConfigNotificationIncremental:
            if len(removes.Pods) > 0 {
                s.updates <- *removes
            }
            if len(adds.Pods) > 0 {
                s.updates <- *adds
            }
            if len(updates.Pods) > 0 {
                s.updates <- *updates
            }
            if len(deletes.Pods) > 0 {
                s.updates <- *deletes
            }
            if len(restores.Pods) > 0 {
                s.updates <- *restores
            }
        }
    
        return nil
    }
    

    看似简单的Channel方法,其实干了很多事情,它将listWatch的全量数据与PodStorage做比对,得到对应操作的Pod集合。

    // NewPodConfig creates an object that can merge many configuration sources into a stream
    // of normalized updates to a pod configuration.
    func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
        updates := make(chan kubetypes.PodUpdate, 50)
        storage := newPodStorage(updates, mode, recorder)
        podConfig := &PodConfig{
            pods:    storage,
            mux:     config.NewMux(storage),
            updates: updates,
            sources: sets.String{},
        }
        return podConfig
    }
    

    PodStorageupdates就是PodConfigupdates,至此整个生产者过程终于理清楚了。

    相关文章

      网友评论

          本文标题:深入分析kubelet(1)——获取Pod创建请求

          本文链接:https://www.haomeiwen.com/subject/ferpoftx.html