kubelet
入口代码
pkg/kubelet/kubelet.go
创建kubelet
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []v1.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
registerSchedulable bool,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
...
创建pluginManager
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
kubeDeps.Recorder,
)
...
}
pkg/kubelet/kubelet_getters.go
默认/var/lib/kubelet
func (kl *Kubelet) getRootDir() string {
return kl.rootDirectory
}
默认/var/lib/kubelet/plugins_registry
func (kl *Kubelet) getPluginsRegistrationDir() string {
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsRegistrationDirName)
}
pkg/kubelet/kubelet.go中
启动kubelet
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
...
}
func (kl *Kubelet) updateRuntimeUp() {
...
添加csi plugin handler
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
...
初始化runtime依赖
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
...
}
func (kl *Kubelet) initializeRuntimeDependentModules() {
...
启动pluginmanager
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
...
}
pkg/kubelet/pluginmanager/plugin_manager.go中
创建poluginManager
func NewPluginManager(
sockDir string,
recorder record.EventRecorder) PluginManager {
asw := cache.NewActualStateOfWorld()
dsw := cache.NewDesiredStateOfWorld()
reconciler := reconciler.NewReconciler(
operationexecutor.NewOperationExecutor(
operationexecutor.NewOperationGenerator(
recorder,
),
),
loopSleepDuration,
dsw,
asw,
)
pm := &pluginManager{
desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
sockDir,
dsw,
),
reconciler: reconciler,
desiredStateOfWorld: dsw,
actualStateOfWorld: asw,
}
return pm
}
启动pluginmanager
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
...
启动watcher
if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
return
}
...
启动reconciler
go pm.reconciler.Run(stopCh)
...
}
csi plugin处理方法
pkg/volume/csi/csi_plugin.go中
var PluginHandler = &RegistrationHandler{}
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
...
csiDrivers.Set(pluginName, Driver{
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersion,
})
...
}
发现plugin
pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go中
创建watcher
func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
return &Watcher{
path: sockDir,
fs: &utilfs.DefaultFs{},
desiredStateOfWorld: desiredStateOfWorld,
}
}
启动watch
func (w *Watcher) Start(stopCh <-chan struct{}) error {
...
fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
}
w.fsWatcher = fsWatcher
遍历plugin目录
if err := w.traversePluginDir(w.path); err != nil {
klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)
}
监听plugin目录事件
go func(fsWatcher *fsnotify.Watcher) {
for {
select {
case event := <-fsWatcher.Events:
...
有新增事件
err := w.handleCreateEvent(event)
...
}
}
}(fsWatcher)
...
}
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
...
处理plugin注册
return w.handlePluginRegistration(event.Name)
...
}
func (w *Watcher) handlePluginRegistration(socketPath string) error {
...
添加plugin到dsw
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
...
}
注册额plugin
pkg/kubelet/pluginmanager/reconciler/reconciler.go中
func NewReconciler(
operationExecutor operationexecutor.OperationExecutor,
loopSleepDuration time.Duration,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
return &reconciler{
operationExecutor: operationExecutor,
loopSleepDuration: loopSleepDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
handlers: make(map[string]cache.PluginHandler),
}
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(func() {
rc.reconcile()
},
rc.loopSleepDuration,
stopCh)
}
添加handler
func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
rc.Lock()
defer rc.Unlock()
rc.handlers[pluginType] = pluginHandler
}
获取handler
func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
rc.RLock()
defer rc.RUnlock()
var copyHandlers = make(map[string]cache.PluginHandler)
for pluginType, handler := range rc.handlers {
copyHandlers[pluginType] = handler
}
return copyHandlers
}
func (rc *reconciler) reconcile() {
...
获取期望注解的plugin
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
...
注册plugin
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
...
}
...
}
pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go中
创建operationExecutor
func NewOperationExecutor(
operationGenerator OperationGenerator) OperationExecutor {
return &operationExecutor{
pendingOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
operationGenerator: operationGenerator,
}
}
注册plugin
func (oe *operationExecutor) RegisterPlugin(
socketPath string,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorld ActualStateOfWorldUpdater) error {
生成注册plugin方法
generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
执行方法
return oe.pendingOperations.Run(
socketPath, generatedOperation)
}
pkg/util/goroutinemap/goroutinemap.go中
创建GoRoutineMap
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
g := &goRoutineMap{
operations: make(map[string]operation),
exponentialBackOffOnError: exponentialBackOffOnError,
}
g.cond = sync.NewCond(&g.lock)
return g
}
执行operationFunc
func (grm *goRoutineMap) Run(
operationName string,
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
existingOp, exists := grm.operations[operationName]
if exists {
// Operation with name exists
if existingOp.operationPending {
return NewAlreadyExistsError(operationName)
}
if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
return err
}
}
grm.operations[operationName] = operation{
operationPending: true,
expBackoff: existingOp.expBackoff,
}
go func() (err error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(operationName, &err)
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
}()
return nil
}
pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go中
创建operationGenerator
func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {
return &operationGenerator{
recorder: recorder,
}
}
生成注册操作
func (og *operationGenerator) GenerateRegisterPluginFunc(
socketPath string,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
registerPluginFunc := func() error {
...
handler, ok := pluginHandlers[infoResp.Type]
...
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}
...
return nil
}
return registerPluginFunc
}
csi-node-driver-registrer
cmd/csi-node-driver-registrar/main.go中
func main() {
...
节点注册
nodeRegister(csiDriverName, addr)
...
}
func nodeRegister(csiDriverName, httpEndpoint string) {
...
构建socketPath
socketPath := buildSocketPath(csiDriverName)
...
监听socketPath
lis, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
os.Exit(1)
}
...
创建grpc server
grpcServer := grpc.NewServer()
registerapi.RegisterRegistrationServer(grpcServer, registrar)
启动http server
go httpServer(socketPath, httpEndpoint, csiDriverName)
退出清楚socketPath
go removeRegSocket(csiDriverName)
...
启动grpc server
if err := grpcServer.Serve(lis); err != nil {
klog.Errorf("Registration Server stopped serving: %v", err)
os.Exit(1)
}
...
}
网友评论