美文网首页
k8s 之 statefulset controller 源码简

k8s 之 statefulset controller 源码简

作者: wwq2020 | 来源:发表于2020-07-20 21:20 被阅读0次

简介

statefulset controller 通过 apiserver pod 或者 statefulset 的变化,然后更新
一致单调更新策略(默认):
按顺序扩容,如果 Pod 不健康,不会有新的 pod 创建,先是逆序停止 pod.
burst 策略:
限制宽松,pods 会尽快创建和删除,不按照特定顺序

创建处

func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &apps.StatefulSet{} },
        NewListFunc:              func() runtime.Object { return &apps.StatefulSetList{} },
        DefaultQualifiedResource: apps.Resource("statefulsets"),

        CreateStrategy: statefulset.Strategy,
        UpdateStrategy: statefulset.Strategy,
        DeleteStrategy: statefulset.Strategy,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{RESTOptions: optsGetter}
    if err := store.CompleteWithOptions(options); err != nil {
        return nil, nil, err
    }

    statusStore := *store
    statusStore.UpdateStrategy = statefulset.StatusStrategy
    return &REST{store}, &StatusREST{store: &statusStore}, nil
}

controller

cmd/kube-controller-manager/app/apps.go 中

func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {
        return nil, false, nil
    }
    go statefulset.NewStatefulSetController(
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Apps().V1().StatefulSets(),
        ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
        ctx.InformerFactory.Apps().V1().ControllerRevisions(),
        ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
    ).Run(int(ctx.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Stop)
    return nil, true, nil
}

pkg/controller/statefulset/stateful_set.go 中

func NewStatefulSetController(
    podInformer coreinformers.PodInformer,
    setInformer appsinformers.StatefulSetInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    revInformer appsinformers.ControllerRevisionInformer,
    kubeClient clientset.Interface,
) *StatefulSetController {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})

    ssc := &StatefulSetController{
        kubeClient: kubeClient,
        control: NewDefaultStatefulSetControl(
            NewRealStatefulPodControl(
                kubeClient,
                setInformer.Lister(),
                podInformer.Lister(),
                pvcInformer.Lister(),
                recorder),
            NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
            history.NewHistory(kubeClient, revInformer.Lister()),
            recorder,
        ),
        pvcListerSynced: pvcInformer.Informer().HasSynced,
        queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
        podControl:      controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

        revListerSynced: revInformer.Informer().HasSynced,
    }

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        // lookup the statefulset and enqueue
        AddFunc: ssc.addPod,
        // lookup current and old statefulset if labels changed
        UpdateFunc: ssc.updatePod,
        // lookup statefulset accounting for deletion tombstones
        DeleteFunc: ssc.deletePod,
    })
    ssc.podLister = podInformer.Lister()
    ssc.podListerSynced = podInformer.Informer().HasSynced

    setInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc: ssc.enqueueStatefulSet,
            UpdateFunc: func(old, cur interface{}) {
                oldPS := old.(*apps.StatefulSet)
                curPS := cur.(*apps.StatefulSet)
                if oldPS.Status.Replicas != curPS.Status.Replicas {
                    klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
                }
                ssc.enqueueStatefulSet(cur)
            },
            DeleteFunc: ssc.enqueueStatefulSet,
        },
    )
    ssc.setLister = setInformer.Lister()
    ssc.setListerSynced = setInformer.Informer().HasSynced

    // TODO: Watch volumes
    return ssc
}

func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer ssc.queue.ShutDown()

    klog.Infof("Starting stateful set controller")
    defer klog.Infof("Shutting down statefulset controller")

    if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(ssc.worker, time.Second, stopCh)
    }

    <-stopCh
}

func (ssc *StatefulSetController) sync(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    set, err := ssc.setLister.StatefulSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.Infof("StatefulSet has been deleted %v", key)
        return nil
    }
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
        return err
    }

    selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
        // This is a non-transient error, so don't retry.
        return nil
    }

    if err := ssc.adoptOrphanRevisions(set); err != nil {
        return err
    }

    pods, err := ssc.getPodsForStatefulSet(set, selector)
    if err != nil {
        return err
    }

    return ssc.syncStatefulSet(set, pods)
}

pkg/controller/statefulset/stateful_set_control.go

func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {

    // list all revisions and sort them
    revisions, err := ssc.ListRevisions(set)
    if err != nil {
        return err
    }
    history.SortControllerRevisions(revisions)

    currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions)
    if err != nil {
        return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
    }

    // maintain the set's revision history limit
    return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}

相关文章

网友评论

      本文标题:k8s 之 statefulset controller 源码简

      本文链接:https://www.haomeiwen.com/subject/yxhukktx.html