- [k8s源码分析][controller-manager] Re
- K8s -- Custom-Metrics及Prometheus
- Kubernetes | 生成 kubeconfig 文件
- [k8s源码分析][controller-manager] ku
- [k8s源码分析][controller-manager] Re
- [k8s源码分析][controller-manager] co
- [k8s源码分析][controller-manager] po
- [k8s源码分析][controller-manager] co
- K8s核心原理(二)之Controller-Manager
- k8s-9-controller-manager和schedul
背景
kube-apiserver
的主要职责是完成对etcd的增删改查,至于增删改查操作完成后,要达到什么效果,就依赖控制器controller-manager
来完成
kube-controller-manager
顾名思义是用来管理控制器的,它由一系列的控制器组成,由它统一的管理(启动、停止等)
那怎么自定义一个controller-manager
呢?在这里做个学习笔记。
初始化
以karmada-controller-manager
的初始化流程为例,他在初始化的时候会创建一些runnable
的分组,需要加入到controller-manager
中的控制器都会加入到这个分组的runnable
中去
以LeaderElection
这个分组为例,就是所有需要选主的控制器都应该加入到这个分组里,这样只有在controller-manager
成为leader的时候,这个组内的控制器才会运行;
其它的三个分组就是不成为leader也需要运行的控制器
func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
return &runnables{
Webhooks: newRunnableGroup(baseContext, errChan),
Caches: newRunnableGroup(baseContext, errChan),
LeaderElection: newRunnableGroup(baseContext, errChan),
Others: newRunnableGroup(baseContext, errChan),
}
}
func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
return runnable.GetCache().WaitForCacheSync(ctx)
})
case *webhook.Server:
return r.Webhooks.Add(fn, nil)
case LeaderElectionRunnable:
if !runnable.NeedLeaderElection() {
return r.Others.Add(fn, nil)
}
return r.LeaderElection.Add(fn, nil)
default:
return r.LeaderElection.Add(fn, nil)
}
}
控制器加入到controller-manager
如何将一个控制器加入到controller-manager
中,一般有两种形式,但这两种形式本质上都是同一种方式,只是编码的区别
方式一--直接加入
type Controller struct {
client.Client // used to operate Cluster resources.
EventRecorder record.EventRecorder
...
}
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
c.clusterHealthMap = newClusterHealthMap()
return utilerrors.NewAggregate([]error{
mgr.Add(c),
})
}
这种方式下,直接通过controller-manager
的Add方法添加一个Controller,我们直接定义一个Controller
,并且实现Runnable
接口,也就是实现它的Start()
方法,这样再controller-manager
启动的时候,就会调用这个Controller
的Start()
方法,进而完成你的控制逻辑
方式二--间接加入
type Controller struct {
client.Client // used to operate Cluster resources.
EventRecorder record.EventRecorder
...
}
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
c.clusterHealthMap = newClusterHealthMap()
return utilerrors.NewAggregate([]error{
controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).Complete(c)
})
}
这种方式下,也是需要定义一个Controller
,并且实现Reconciler
接口,也就是实现它的Reconcile()
方法,然后主要依赖k8s
提供的依赖包完成控制器的加入,我们再具体看下它是怎么加入的
func (blder *Builder) Complete(r reconcile.Reconciler) error {
_, err := blder.Build(r)
return err
}
// Build builds the Application Controller and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
...
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
return blder.ctrl, nil
}
再Complete()
的时候,doController
也会创建一个内部的Controller
,然后doWatch
完成对我们指定对象&clusterv1alpha1.Cluster{}
的Informer操作,我们先看下doController
方法
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
...
// Add the controller as a Manager components
return c, mgr.Add(c)
}
可以看到也是创建了一个Controller
,然后调用controller-manager
的Add方法添加这个Controller
;
再看下doWatch
方法的实现逻辑
func (blder *Builder) doWatch() error {
// Reconcile type
if blder.forInput.object != nil {
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
// 调用刚刚创建的内部Controller的Watch方法
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
...
return nil
}
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
// 内部Controller还没启动的时候,只是将其添加到一个数组里
// 再controller-manager启动的时候,会去启动每一个Controller, 然后在执行真正的Start操作
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
...
go func() {
var (
i cache.Informer
lastErr error
)
// 为我们指定的对象&clusterv1alpha1.Cluster{}创建Informer
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
...
return true, nil
});
// 为这个Informer添加事件处理回调,在事件发生时进行回调
_, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
...
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()
return nil
}
总结一下这种方式的流程:
- 通过
Build
创建一个内部的Controller,并将这个Controller
加入到controller-manager
中 - 由于此时
controller-manager
还没启动,因此会构建一个待watch的对象加入到数组中,等待controller-manager
启动的时候去执行真正的watch -
controller-manager
在后面启动的时候,会创建对象&clusterv1alpha1.Cluster{}
的Informer,并注册事件回调
因为往controller-manager
里加入的是这个内部Controller
,因此controller-manager
启动时还是调用的这个Controller
的Start()
方法,我们再来看一下
func (c *Controller) Start(ctx context.Context) error {
...
wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()
// controller-manager还没启动的时候,都是将对象加入到这个数组中
// 现在启动了,因此需要调用每一个对象的Start()方法,也就是上面的Informer的创建和注册事件回调
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
...
wg.Add(c.MaxConcurrentReconciles)
// 启动多个携程来处理任务
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}()
}
c.Started = true
return nil
}()
...
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
...
defer c.Queue.Done(obj)
...
c.reconcileHandler(ctx, obj)
return true
}
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
...
// Do 记录的就是我们自定义的Controller对象,在这里回调了自定义Controller中的Reconcile()方法
return c.Do.Reconcile(ctx, req)
}
启动流程在总结一下:
- 创建我们指定对象
&clusterv1alpha1.Cluster{}
的Informer,并注册事件回调,等待Informer同步完成;下面是我们注册的Informer事件回调,它会在收到事件后往队列中存放
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
- 阻塞的从队列中获取元素,这里会一直阻塞,直到Informer那边往队列里写入元素
- 内部Controller从队列中获取到元素后,回调自定义Controller的Reconcile()方法,进而实现调谐的过程
对比
两种加入Controller
的方式对于开发者来说要做的事情都不多,更主要的问题在于我们的Start()
方法 或者 Reconcile()
怎么实现,来完成控制器逻辑
- 第一种方式,由于是
controller-manager
直接调用的,因此入参只有一个context上下文,所以这里比较适合做一些定时任务,比如定时的检测集群的状态,然后给集群添加污点
func (c *Controller) Start(ctx context.Context) error {
klog.Infof("Starting cluster health monitor")
defer klog.Infof("Shutting cluster health monitor")
// Incorporate the results of cluster health signal pushed from cluster-status-controller to master.
go wait.UntilWithContext(ctx, func(ctx context.Context) {
if err := c.monitorClusterHealth(ctx); err != nil {
klog.Errorf("Error monitoring cluster health: %v", err)
}
// 默认值 5s,集群健康状态的检测周期
}, c.ClusterMonitorPeriod)
<-ctx.Done()
return nil
}
- 第二中方式,主要是通过Informer实现,在收到事件后就会回调
Reconcile()
方法,因此入参除了context上下文,还有watch事件里新产生的对象,所以这里比较适合做一些实时的控制,即发生某个事件后就需要进行相关的处理,比如集群创建后,就需要给这个集群创建一个命名空间,并且添加污点、添加Finalizers
func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling cluster %s", req.NamespacedName.Name)
cluster := &clusterv1alpha1.Cluster{}
if err := c.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
// The resource may no longer exist, in which case we stop processing.
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
// cluster对象存在 deletionTimestamp,表明需要删除
if !cluster.DeletionTimestamp.IsZero() {
return c.removeCluster(ctx, cluster)
}
return c.syncCluster(ctx, cluster)
}
func (c *Controller) syncCluster(ctx context.Context, cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) {
// create execution space
// 检查 karmada-es-member1 命名空间是否存在,不存在则创建
err := c.createExecutionSpace(cluster)
if err != nil {
c.EventRecorder.Event(cluster, corev1.EventTypeWarning, events.EventReasonCreateExecutionSpaceFailed, err.Error())
return controllerruntime.Result{Requeue: true}, err
}
// taint cluster by condition
// 根据集群的Condition给Cluster添加污点
err = c.taintClusterByCondition(ctx, cluster)
if err != nil {
c.EventRecorder.Event(cluster, corev1.EventTypeWarning, events.EventReasonTaintClusterByConditionFailed, err.Error())
return controllerruntime.Result{Requeue: true}, err
}
// ensure finalizer
// 给集群添加 karmada.io/cluster-controller 这个Finalizer
return c.ensureFinalizer(cluster)
}
网友评论