美文网首页
从源码看csi driver如何注册

从源码看csi driver如何注册

作者: wwq2020 | 来源:发表于2024-02-26 14:56 被阅读0次

    kubelet

    入口代码

    pkg/kubelet/kubelet.go

    创建kubelet
    func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIPs []net.IP,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        imageCredentialProviderConfigFile string,
        imageCredentialProviderBinDir string,
        registerNode bool,
        registerWithTaints []v1.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        nodeStatusMaxImages int32,
        seccompDefault bool,
    ) (*Kubelet, error) {
        ...
        创建pluginManager
        klet.pluginManager = pluginmanager.NewPluginManager(
            klet.getPluginsRegistrationDir(), /* sockDir */
            kubeDeps.Recorder,
        )
        ...
    }
    

    pkg/kubelet/kubelet_getters.go

    默认/var/lib/kubelet
    func (kl *Kubelet) getRootDir() string {
        return kl.rootDirectory
    }
    
    默认/var/lib/kubelet/plugins_registry
    func (kl *Kubelet) getPluginsRegistrationDir() string {
        return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsRegistrationDirName)
    }
    

    pkg/kubelet/kubelet.go中

    启动kubelet
    func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
        ...
        go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
        ...
    }
    
    func (kl *Kubelet) updateRuntimeUp() {
    ...
        添加csi plugin handler
        kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
    ...
        初始化runtime依赖
        kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
    ...
    }
    
    func (kl *Kubelet) initializeRuntimeDependentModules() {
    ...
    启动pluginmanager
        go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
    ...
    }
    

    pkg/kubelet/pluginmanager/plugin_manager.go中

    创建poluginManager
    func NewPluginManager(
        sockDir string,
        recorder record.EventRecorder) PluginManager {
        asw := cache.NewActualStateOfWorld()
        dsw := cache.NewDesiredStateOfWorld()
        reconciler := reconciler.NewReconciler(
            operationexecutor.NewOperationExecutor(
                operationexecutor.NewOperationGenerator(
                    recorder,
                ),
            ),
            loopSleepDuration,
            dsw,
            asw,
        )
    
        pm := &pluginManager{
            desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
                sockDir,
                dsw,
            ),
            reconciler:          reconciler,
            desiredStateOfWorld: dsw,
            actualStateOfWorld:  asw,
        }
        return pm
    }
    
    启动pluginmanager
    func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    ...
    启动watcher
        if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
            klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
            return
        }
    ...
    启动reconciler
        go pm.reconciler.Run(stopCh)
    ...
    }
    

    csi plugin处理方法

    pkg/volume/csi/csi_plugin.go中

    var PluginHandler = &RegistrationHandler{}
    
    func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
    ...
        csiDrivers.Set(pluginName, Driver{
            endpoint:                endpoint,
            highestSupportedVersion: highestSupportedVersion,
        })
    ...
    }
    

    发现plugin

    pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go中

    创建watcher
    func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
        return &Watcher{
            path:                sockDir,
            fs:                  &utilfs.DefaultFs{},
            desiredStateOfWorld: desiredStateOfWorld,
        }
    }
    
    启动watch
    func (w *Watcher) Start(stopCh <-chan struct{}) error {
    ...
        fsWatcher, err := fsnotify.NewWatcher()
        if err != nil {
            return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
        }
        w.fsWatcher = fsWatcher
        遍历plugin目录
        if err := w.traversePluginDir(w.path); err != nil {
            klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)
        }
        监听plugin目录事件
        go func(fsWatcher *fsnotify.Watcher) {
            for {
                select {
                case event := <-fsWatcher.Events:
                    ...
                    有新增事件
                        err := w.handleCreateEvent(event)
                    ...
                }
            }
        }(fsWatcher)
    ...
    }
    
    func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
    ...
    处理plugin注册
            return w.handlePluginRegistration(event.Name)
    ...
    }
    
    func (w *Watcher) handlePluginRegistration(socketPath string) error {
        ...
        添加plugin到dsw
        err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
        ...
    }
    

    注册额plugin

    pkg/kubelet/pluginmanager/reconciler/reconciler.go中

    func NewReconciler(
        operationExecutor operationexecutor.OperationExecutor,
        loopSleepDuration time.Duration,
        desiredStateOfWorld cache.DesiredStateOfWorld,
        actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
        return &reconciler{
            operationExecutor:   operationExecutor,
            loopSleepDuration:   loopSleepDuration,
            desiredStateOfWorld: desiredStateOfWorld,
            actualStateOfWorld:  actualStateOfWorld,
            handlers:            make(map[string]cache.PluginHandler),
        }
    }
    
    func (rc *reconciler) Run(stopCh <-chan struct{}) {
        wait.Until(func() {
            rc.reconcile()
        },
            rc.loopSleepDuration,
            stopCh)
    }
    
    添加handler
    func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
        rc.Lock()
        defer rc.Unlock()
    
        rc.handlers[pluginType] = pluginHandler
    }
    
    获取handler
    func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
        rc.RLock()
        defer rc.RUnlock()
    
        var copyHandlers = make(map[string]cache.PluginHandler)
        for pluginType, handler := range rc.handlers {
            copyHandlers[pluginType] = handler
        }
        return copyHandlers
    }
    
    
    func (rc *reconciler) reconcile() {
    ...
    获取期望注解的plugin
        for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
    ...
    注册plugin
                err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
    ...
        }
    ...
    }
    

    pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go中

    创建operationExecutor
    func NewOperationExecutor(
        operationGenerator OperationGenerator) OperationExecutor {
    
        return &operationExecutor{
            pendingOperations:  goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
            operationGenerator: operationGenerator,
        }
    }
    
    注册plugin
    func (oe *operationExecutor) RegisterPlugin(
        socketPath string,
        timestamp time.Time,
        pluginHandlers map[string]cache.PluginHandler,
        actualStateOfWorld ActualStateOfWorldUpdater) error {
        生成注册plugin方法
        generatedOperation :=
            oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
        执行方法
        return oe.pendingOperations.Run(
            socketPath, generatedOperation)
    }
    

    pkg/util/goroutinemap/goroutinemap.go中

    创建GoRoutineMap
    func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
        g := &goRoutineMap{
            operations:                make(map[string]operation),
            exponentialBackOffOnError: exponentialBackOffOnError,
        }
    
        g.cond = sync.NewCond(&g.lock)
        return g
    }
    
    执行operationFunc
    func (grm *goRoutineMap) Run(
        operationName string,
        operationFunc func() error) error {
        grm.lock.Lock()
        defer grm.lock.Unlock()
    
        existingOp, exists := grm.operations[operationName]
        if exists {
            // Operation with name exists
            if existingOp.operationPending {
                return NewAlreadyExistsError(operationName)
            }
    
            if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
                return err
            }
        }
    
        grm.operations[operationName] = operation{
            operationPending: true,
            expBackoff:       existingOp.expBackoff,
        }
        go func() (err error) {
            // Handle unhandled panics (very unlikely)
            defer k8sRuntime.HandleCrash()
            // Handle completion of and error, if any, from operationFunc()
            defer grm.operationComplete(operationName, &err)
            // Handle panic, if any, from operationFunc()
            defer k8sRuntime.RecoverFromPanic(&err)
            return operationFunc()
        }()
    
        return nil
    }
    
    

    pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go中

    创建operationGenerator
    func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {
    
        return &operationGenerator{
            recorder: recorder,
        }
    }
    
    
    生成注册操作
    func (og *operationGenerator) GenerateRegisterPluginFunc(
        socketPath string,
        timestamp time.Time,
        pluginHandlers map[string]cache.PluginHandler,
        actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
    
        registerPluginFunc := func() error {
    ...
    
            handler, ok := pluginHandlers[infoResp.Type]
    ...
    
            if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
                return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
            }
    ...
            return nil
        }
        return registerPluginFunc
    }
    

    csi-node-driver-registrer

    cmd/csi-node-driver-registrar/main.go中

    func main() {
        ...
        节点注册
        nodeRegister(csiDriverName, addr)
        ...
    }
    
    func nodeRegister(csiDriverName, httpEndpoint string) {
        ...
        构建socketPath
        socketPath := buildSocketPath(csiDriverName)
        ...
        监听socketPath
        lis, err := net.Listen("unix", socketPath)
        if err != nil {
            klog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
            os.Exit(1)
        }
        ...
        创建grpc server
        grpcServer := grpc.NewServer()
        registerapi.RegisterRegistrationServer(grpcServer, registrar)
    
        启动http server
        go httpServer(socketPath, httpEndpoint, csiDriverName)
        退出清楚socketPath
        go removeRegSocket(csiDriverName)
        ...
        启动grpc server
        if err := grpcServer.Serve(lis); err != nil {
            klog.Errorf("Registration Server stopped serving: %v", err)
            os.Exit(1)
        }
        ...
    }
    

    相关文章

      网友评论

          本文标题:从源码看csi driver如何注册

          本文链接:https://www.haomeiwen.com/subject/ybmzadtx.html