入口逻辑
启动和初始化入口 startDeploymentController
- cmd/kube-controller-manager/app/apps.go:22
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
// DeploymentController实例化
dc, err := deployment.NewDeploymentController(
// Deployment Informer
ctx.InformerFactory.Apps().V1().Deployments(),
// ReplicaSet Informer
ctx.InformerFactory.Apps().V1().ReplicaSets(),
// Pod Informer
ctx.InformerFactory.Core().V1().Pods(),
// k8s client
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
在startDeploymentController()
函数中通过NewDeploymentController()
方法初始化DeploymentController
实例,入参包含Deployment、ReplicaSet、Pod的Informer接口和K8sClient。
DeploymentController
对象
DeploymentController
类型定义
- pkg/controller/deployment/deployment_controller.go:68
// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running replica sets and pods.
type DeploymentController struct {
// rsControl is used for adopting/releasing replica sets.
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder
// To allow injection of syncDeployment for testing.
syncHandler func(dKey string) error
// used for unit testing
enqueueDeployment func(deployment *apps.Deployment)
// dLister can list/get deployments from the shared informer's store
dLister appslisters.DeploymentLister
// rsLister can list/get replica sets from the shared informer's store
rsLister appslisters.ReplicaSetLister
// podLister can list/get pods from the shared informer's store
podLister corelisters.PodLister
// dListerSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dListerSynced cache.InformerSynced
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced cache.InformerSynced
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced
// Deployments that need to be synced
queue workqueue.RateLimitingInterface
}
DeploymentController
初始化
- pkg/controller/deployment/deployment_controller.go:101
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// k8s事件
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
// 注册prometheus metric `deployment_controller_rate_limiter_use`
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
// k8s client, replicaset操作
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}
// deployment informer ResourceEventHandler
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment, // create deployment的事件处理逻辑
UpdateFunc: dc.updateDeployment, // update deployment的事件处理逻辑
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment, // delete deployment的事件处理逻辑
})
// replicaset informer ResourceEventHandler
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
// pod ResourceEventHandler
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})
// deployment处理逻辑,不支持同一个deployment资源的并发处理
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
// resource lister
dc.dLister = dInformer.Lister()
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
// 缓存同步完成标记
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
return dc, nil
}
DeploymentController
启动
DeploymentController
执行逻辑 -- Run()
函数
- pkg/controller/deployment/deployment_controller.go:149
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
klog.InfoS("Starting controller", "controller", "deployment")
defer klog.InfoS("Shutting down controller", "controller", "deployment")
// 等待资源[deployment,replicaset,pod]缓存同步完成
if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}
- pkg/controller/deployment/deployment_controller.go:460
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
for dc.processNextWorkItem() {
}
}
- pkg/controller/deployment/deployment_controller.go:465
func (dc *DeploymentController) processNextWorkItem() bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
// syncDeployment,同步当前资源的状态
err := dc.syncHandler(key.(string))
dc.handleErr(err, key)
return true
}
- pkg/controller/deployment/deployment_controller.go:568
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
// 获取当前资源的namespace,name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}
startTime := time.Now()
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
defer func() {
klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
}()
// 根据 namespace 和 name 从 cache 中检索对应 Deployment 对象
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
// 空LabelSelector匹配当前namespace下所有pod, 发送一个Warning Event, 更新 .Status.ObservedGeneration 然后返回
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
}
return nil
}
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
// 获取当前Deployment拥有的所有ReplicaSet, 同时会更新这些ReplicaSet的ControllerRef
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
// map[types.UID][]*v1.Pod 类型,key是rs的UID,value是对应rs管理的所有pod列表
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
// 已经标记删除的deployment,仅更新状态
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
// 根据 .Spec.Pause配置看是否更新Deployment的conditions
if err = dc.checkPausedConditions(d); err != nil {
return err
}
// .spec.paused=true,更新所有replicas!=0的replicaset
if d.Spec.Paused {
return dc.sync(d, rsList)
}
// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
// 老版本deployment版本回滚逻辑兼容
return dc.rollback(d, rsList)
}
// 判断当前事件是否为副本数目调整事件
scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList)
}
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
// pod重建的发布策略
return dc.rolloutRecreate(d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
// pod滚动更新的发布策略
return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
网友评论