美文网首页
如何优雅滚动更新Pod

如何优雅滚动更新Pod

作者: wwq2020 | 来源:发表于2021-09-26 15:55 被阅读0次

    背景

    触发滚动更新时候,会删除pod,会访问apiserver的接口,此时会设置tDeletionTimestamp
    相关代码如下

    k8s.io/apiserver/pkg/storage/etcd3/store.go

    func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
        versioner := APIObjectVersioner{}
        result := &store{
            client:        c,
            codec:         codec,
            versioner:     versioner,
            transformer:   transformer,
            pagingEnabled: pagingEnabled,
            // for compatibility with etcd2 impl.
            // no-op for default prefix of '/registry'.
            // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
            pathPrefix:   path.Join("/", prefix),
            watcher:      newWatcher(c, codec, newFunc, versioner, transformer),
            leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
        }
        return result
    }
    
    func (s *store) Delete(
        ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
        validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
        v, err := conversion.EnforcePtr(out)
        if err != nil {
            return fmt.Errorf("unable to convert output object to pointer: %v", err)
        }
        key = path.Join(s.pathPrefix, key)
        return s.conditionalDelete(ctx, key, out, v, preconditions, validateDeletion, cachedExistingObject)
    }
    
    func (s *store) conditionalDelete(
        ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
        validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
        getCurrentState := func() (*objState, error) {
            startTime := time.Now()
            getResp, err := s.client.KV.Get(ctx, key)
            metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
            if err != nil {
                return nil, err
            }
            return s.getState(getResp, key, v, false)
        }
    
        var origState *objState
        var err error
        var origStateIsCurrent bool
        if cachedExistingObject != nil {
            origState, err = s.getStateFromObject(cachedExistingObject)
        } else {
            origState, err = getCurrentState()
            origStateIsCurrent = true
        }
        if err != nil {
            return err
        }
    
        for {
            if preconditions != nil {
                if err := preconditions.Check(key, origState.obj); err != nil {
                    if origStateIsCurrent {
                        return err
                    }
    
                    // It's possible we're working with stale data.
                    // Remember the revision of the potentially stale data and the resulting update error
                    cachedRev := origState.rev
                    cachedUpdateErr := err
    
                    // Actually fetch
                    origState, err = getCurrentState()
                    if err != nil {
                        return err
                    }
                    origStateIsCurrent = true
    
                    // it turns out our cached data was not stale, return the error
                    if cachedRev == origState.rev {
                        return cachedUpdateErr
                    }
    
                    // Retry
                    continue
                }
            }
            if err := validateDeletion(ctx, origState.obj); err != nil {
                if origStateIsCurrent {
                    return err
                }
    
                // It's possible we're working with stale data.
                // Remember the revision of the potentially stale data and the resulting update error
                cachedRev := origState.rev
                cachedUpdateErr := err
    
                // Actually fetch
                origState, err = getCurrentState()
                if err != nil {
                    return err
                }
                origStateIsCurrent = true
    
                // it turns out our cached data was not stale, return the error
                if cachedRev == origState.rev {
                    return cachedUpdateErr
                }
    
                // Retry
                continue
            }
    
            startTime := time.Now()
            txnResp, err := s.client.KV.Txn(ctx).If(
                clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
            ).Then(
                clientv3.OpDelete(key),
            ).Else(
                clientv3.OpGet(key),
            ).Commit()
            metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
            if err != nil {
                return err
            }
            if !txnResp.Succeeded {
                getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
                klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
                origState, err = s.getState(getResp, key, v, false)
                if err != nil {
                    return err
                }
                origStateIsCurrent = true
                continue
            }
            return decode(s.codec, s.versioner, origState.data, out, origState.rev)
        }
    }
    
    

    k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

    func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
        stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
        if err != nil {
            return nil, nil, err
        }
    
        client, err := newETCD3Client(c.Transport)
        if err != nil {
            stopCompactor()
            return nil, nil, err
        }
    
        stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
        if err != nil {
            return nil, nil, err
        }
    
        var once sync.Once
        destroyFunc := func() {
            // we know that storage destroy funcs are called multiple times (due to reuse in subresources).
            // Hence, we only destroy once.
            // TODO: fix duplicated storage destroy calls higher level
            once.Do(func() {
                stopCompactor()
                stopDBSizeMonitor()
                client.Close()
            })
        }
        transformer := c.Transformer
        if transformer == nil {
            transformer = value.IdentityTransformer
        }
        return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
    }
    

    k8s.io/apiserver/pkg/storage/cacher/cacher.go

    func NewCacherFromConfig(config Config) (*Cacher, error) {
        ...
        cacher := &Cacher{
            ready:          newReady(),
            storage:        config.Storage,
            objectType:     objType,
            versioner:      config.Versioner,
            newFunc:        config.NewFunc,
            indexedTrigger: indexedTrigger,
            watcherIdx:     0,
            watchers: indexedWatchers{
                allWatchers:   make(map[int]*cacheWatcher),
                valueWatchers: make(map[string]watchersMap),
            },
            // TODO: Figure out the correct value for the buffer size.
            incoming:              make(chan watchCacheEvent, 100),
            dispatchTimeoutBudget: newTimeBudget(),
            // We need to (potentially) stop both:
            // - wait.Until go-routine
            // - reflector.ListAndWatch
            // and there are no guarantees on the order that they will stop.
            // So we will be simply closing the channel, and synchronizing on the WaitGroup.
            stopCh:           stopCh,
            clock:            config.Clock,
            timer:            time.NewTimer(time.Duration(0)),
            bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
        }
    
        ...
        return cacher, nil
    }
    
    func (c *Cacher) Delete(
        ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
        validateDeletion storage.ValidateObjectFunc, _ runtime.Object) error {
        // Ignore the suggestion and try to pass down the current version of the object
        // read from cache.
        if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
            klog.Errorf("GetByKey returned error: %v", err)
        } else if exists {
            // DeepCopy the object since we modify resource version when serializing the
            // current object.
            currObj := elem.(*storeElement).Object.DeepCopyObject()
            return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj)
        }
        // If we couldn't get the object, fallback to no-suggestion.
        return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
    }
    

    k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

    func StorageWithCacher() generic.StorageDecorator {
        return func(
            storageConfig *storagebackend.ConfigForResource,
            resourcePrefix string,
            keyFunc func(obj runtime.Object) (string, error),
            newFunc func() runtime.Object,
            newListFunc func() runtime.Object,
            getAttrsFunc storage.AttrFunc,
            triggerFuncs storage.IndexerFuncs,
            indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
    
            s, d, err := generic.NewRawStorage(storageConfig, newFunc)
            if err != nil {
                return s, d, err
            }
            if klog.V(5).Enabled() {
                klog.InfoS("Storage caching is enabled", objectTypeToArgs(newFunc())...)
            }
    
            cacherConfig := cacherstorage.Config{
                Storage:        s,
                Versioner:      etcd3.APIObjectVersioner{},
                ResourcePrefix: resourcePrefix,
                KeyFunc:        keyFunc,
                NewFunc:        newFunc,
                NewListFunc:    newListFunc,
                GetAttrsFunc:   getAttrsFunc,
                IndexerFuncs:   triggerFuncs,
                Indexers:       indexers,
                Codec:          storageConfig.Codec,
            }
            cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
            if err != nil {
                return nil, func() {}, err
            }
            destroyFunc := func() {
                cacher.Stop()
                d()
            }
    
            // TODO : Remove RegisterStorageCleanup below when PR
            // https://github.com/kubernetes/kubernetes/pull/50690
            // merges as that shuts down storage properly
            RegisterStorageCleanup(destroyFunc)
    
            return cacher, destroyFunc, nil
        }
    }
    

    k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go

    func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
        switch c.Type {
        case storagebackend.StorageTypeETCD2:
            return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
        case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
            return newETCD3Storage(c, newFunc)
        default:
            return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
        }
    }
    

    k8s.io/apiserver/pkg/registry/generic/storage_decorator.go

    func UndecoratedStorage(
        config *storagebackend.ConfigForResource,
        resourcePrefix string,
        keyFunc func(obj runtime.Object) (string, error),
        newFunc func() runtime.Object,
        newListFunc func() runtime.Object,
        getAttrsFunc storage.AttrFunc,
        trigger storage.IndexerFuncs,
        indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
        return NewRawStorage(config, newFunc)
    }
    
    
    func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
        return factory.Create(*config, newFunc)
    }
    

    k8s.io/apiserver/pkg/server/options/etcd.go

    func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
        if err := s.addEtcdHealthEndpoint(c); err != nil {
            return err
        }
    
        // use the StorageObjectCountTracker interface instance from server.Config
        s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
    
        c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
        return nil
    }
    
    func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
        storageConfig, err := f.StorageFactory.NewConfig(resource)
        if err != nil {
            return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
        }
    
        ret := generic.RESTOptions{
            StorageConfig:             storageConfig,
            Decorator:                 generic.UndecoratedStorage,
            DeleteCollectionWorkers:   f.Options.DeleteCollectionWorkers,
            EnableGarbageCollection:   f.Options.EnableGarbageCollection,
            ResourcePrefix:            f.StorageFactory.ResourcePrefix(resource),
            CountMetricPollPeriod:     f.Options.StorageConfig.CountMetricPollPeriod,
            StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
        }
        if f.Options.EnableWatchCache {
            sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
            if err != nil {
                return generic.RESTOptions{}, err
            }
            size, ok := sizes[resource]
            if ok && size > 0 {
                klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
            }
            if ok && size <= 0 {
                ret.Decorator = generic.UndecoratedStorage
            } else {
                ret.Decorator = genericregistry.StorageWithCacher()
            }
        }
    
        return ret, nil
    }
    

    cmd/kube-apiserver/app/server.go

    func buildGenericConfig(
        s *options.ServerRunOptions,
        proxyTransport *http.Transport,
    ) (
        genericConfig *genericapiserver.Config,
        versionedInformers clientgoinformers.SharedInformerFactory,
        serviceResolver aggregatorapiserver.ServiceResolver,
        pluginInitializers []admission.PluginInitializer,
        admissionPostStartHook genericapiserver.PostStartHookFunc,
        storageFactory *serverstorage.DefaultStorageFactory,
        lastErr error,
    ) {
        ...
        genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
        if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
            return
        }
        ...
    }
    
    func CreateKubeAPIServerConfig(s completedServerRunOptions) (
        ...
        genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
        if err != nil {
            return nil, nil, nil, err
        }
        ...
        config := &controlplane.Config{
            GenericConfig: genericConfig,
        ...
    }
    
    func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
        kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
        if err != nil {
            return nil, err
        }
        ...
        apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
            serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
        if err != nil {
            return nil, err
        }
        ...
        kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
        if err != nil {
            return nil, err
        }
        ...
    }
    
    func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
        kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
        if err != nil {
            return nil, err
        }
    
        return kubeAPIServer, nil
    }
    

    pkg/controlplane/instance.go中

    func (c *Config) Complete() CompletedConfig {
        cfg := completedConfig{
            c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
            &c.ExtraConfig,
        }
    
        serviceIPRange, apiServerServiceIP, err := ServiceIPRange(cfg.ExtraConfig.ServiceIPRange)
        if err != nil {
            klog.Fatalf("Error determining service IP ranges: %v", err)
        }
        if cfg.ExtraConfig.ServiceIPRange.IP == nil {
            cfg.ExtraConfig.ServiceIPRange = serviceIPRange
        }
        if cfg.ExtraConfig.APIServerServiceIP == nil {
            cfg.ExtraConfig.APIServerServiceIP = apiServerServiceIP
        }
    
        discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.GenericConfig.ExternalAddress}
        discoveryAddresses.CIDRRules = append(discoveryAddresses.CIDRRules,
            discovery.CIDRRule{IPRange: cfg.ExtraConfig.ServiceIPRange, Address: net.JoinHostPort(cfg.ExtraConfig.APIServerServiceIP.String(), strconv.Itoa(cfg.ExtraConfig.APIServerServicePort))})
        cfg.GenericConfig.DiscoveryAddresses = discoveryAddresses
    
        if cfg.ExtraConfig.ServiceNodePortRange.Size == 0 {
            // TODO: Currently no way to specify an empty range (do we need to allow this?)
            // We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
            // but then that breaks the strict nestedness of ServiceType.
            // Review post-v1
            cfg.ExtraConfig.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
            klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.ExtraConfig.ServiceNodePortRange)
        }
    
        if cfg.ExtraConfig.EndpointReconcilerConfig.Interval == 0 {
            cfg.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
        }
    
        if cfg.ExtraConfig.MasterEndpointReconcileTTL == 0 {
            cfg.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
        }
    
        if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
            cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
        }
    
        return CompletedConfig{&cfg}
    }
    
    func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
        ...
        if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
            legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
                StorageFactory:              c.ExtraConfig.StorageFactory,
                ProxyTransport:              c.ExtraConfig.ProxyTransport,
                KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
                EventTTL:                    c.ExtraConfig.EventTTL,
                ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
                SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
                ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
                LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
                ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
                ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
                ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
                APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
            }
            if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
                return nil, err
            }
        }
    
        restStorageProviders := []RESTStorageProvider{
            apiserverinternalrest.StorageProvider{},
            authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
            authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
            autoscalingrest.RESTStorageProvider{},
            batchrest.RESTStorageProvider{},
            certificatesrest.RESTStorageProvider{},
            coordinationrest.RESTStorageProvider{},
            discoveryrest.StorageProvider{},
            networkingrest.RESTStorageProvider{},
            noderest.RESTStorageProvider{},
            policyrest.RESTStorageProvider{},
            rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
            schedulingrest.RESTStorageProvider{},
            storagerest.RESTStorageProvider{},
            flowcontrolrest.RESTStorageProvider{},
            // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
            // See https://github.com/kubernetes/kubernetes/issues/42392
            appsrest.StorageProvider{},
            admissionregistrationrest.RESTStorageProvider{},
            eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
        }
        if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
            return nil, err
        }
        ...
    }
    
    func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
        legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
        if err != nil {
            return fmt.Errorf("error building core storage: %v", err)
        }
    
        controllerName := "bootstrap-controller"
        coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
        bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
        m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
        m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
    
        if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
            return fmt.Errorf("error in registering group versions: %v", err)
        }
        return nil
    }
    
    func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
        apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
        ...
        for _, restStorageBuilder := range restStorageProviders {
            ...
            apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
            if err != nil {
                return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
            }
            ...
            apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
        }
        if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
            return fmt.Errorf("error in registering group versions: %v", err)
        }
        ...
    }
    

    pkg/registry/core/rest/storage_core.go中

    func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
        ...
        podStorage, err := podstore.NewStorage(
            restOptionsGetter,
            nodeStorage.KubeletConnectionInfo,
            c.ProxyTransport,
            podDisruptionClient,
        )
        ...
        restStorageMap := map[string]rest.Storage{
            "pods":             podStorage.Pod,
        ...
    }
    

    pkg/registry/core/pod/storage/storage.go

    func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
        store := &genericregistry.Store{
            NewFunc:                  func() runtime.Object { return &api.Pod{} },
            NewListFunc:              func() runtime.Object { return &api.PodList{} },
            PredicateFunc:            registrypod.MatchPod,
            DefaultQualifiedResource: api.Resource("pods"),
    
            CreateStrategy:      registrypod.Strategy,
            UpdateStrategy:      registrypod.Strategy,
            DeleteStrategy:      registrypod.Strategy,
            ResetFieldsStrategy: registrypod.Strategy,
            ReturnDeletedObject: true,
    
            TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
        }
        options := &generic.StoreOptions{
            RESTOptions: optsGetter,
            AttrFunc:    registrypod.GetAttrs,
            TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
            Indexers:    registrypod.Indexers(),
        }
        if err := store.CompleteWithOptions(options); err != nil {
            return PodStorage{}, err
        }
        ...
        return PodStorage{
            Pod:                 &REST{store, proxyTransport},
            Binding:             &BindingREST{store: store},
            LegacyBinding:       &LegacyBindingREST{bindingREST},
            Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
            Status:              &StatusREST{store: &statusStore},
            EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
            Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
            Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
            Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
            Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
            PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
        }, nil
    }
    

    k8s.io/apiserver/pkg/registry/generic/registry/store.go

    func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
        ...
        if e.Storage.Storage == nil {
            e.Storage.Codec = opts.StorageConfig.Codec
            var err error
            e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
                opts.StorageConfig,
                prefix,
                keyFunc,
                e.NewFunc,
                e.NewListFunc,
                attrFunc,
                options.TriggerFunc,
                options.Indexers,
            )
            if err != nil {
                return err
            }
            e.StorageVersioner = opts.StorageConfig.EncodeVersioner
    
            if opts.CountMetricPollPeriod > 0 {
                stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
                previousDestroy := e.DestroyFunc
                e.DestroyFunc = func() {
                    stopFunc()
                    if previousDestroy != nil {
                        previousDestroy()
                    }
                }
            }
        }
        ...
    }
    func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
        ...
        graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
        if err != nil {
            return nil, false, err
        }
        ...
        if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil); err != nil {
            // Please refer to the place where we set ignoreNotFound for the reason
            // why we ignore the NotFound error .
            if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
                // The lastExisting object may not be the last state of the object
                // before its deletion, but it's the best approximation.
                out, err := e.finalizeDelete(ctx, lastExisting, true, options)
                return out, true, err
            }
            return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
        }
        ...
    }
    

    k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go

    func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool, cachedExistingObject runtime.Object) error {
        if dryRun {
            if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
                return err
            }
            if err := preconditions.Check(key, out); err != nil {
                return err
            }
            return deleteValidation(ctx, out)
        }
        return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation, cachedExistingObject)
    }
    

    k8s.io/apiserver/pkg/registry/rest/delete.go

    func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
        ...
        if objectMeta.GetDeletionTimestamp() != nil {
            // if we are already being deleted, we may only shorten the deletion grace period
            // this means the object was gracefully deleted previously but deletionGracePeriodSeconds was not set,
            // so we force deletion immediately
            // IMPORTANT:
            // The deletion operation happens in two phases.
            // 1. Update to set DeletionGracePeriodSeconds and DeletionTimestamp
            // 2. Delete the object from storage.
            // If the update succeeds, but the delete fails (network error, internal storage error, etc.),
            // a resource was previously left in a state that was non-recoverable.  We
            // check if the existing stored resource has a grace period as 0 and if so
            // attempt to delete immediately in order to recover from this scenario.
            if objectMeta.GetDeletionGracePeriodSeconds() == nil || *objectMeta.GetDeletionGracePeriodSeconds() == 0 {
                return false, false, nil
            }
        ...
        objectMeta.SetDeletionTimestamp(&now)
        objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)
        ...
    }
    

    k8s.io/apiserver/pkg/server/genericapiserver.go中

    func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
        if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
            return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
        }
    
        openAPIModels, err := s.getOpenAPIModels(apiPrefix, apiGroupInfo)
        if err != nil {
            return fmt.Errorf("unable to get openapi models: %v", err)
        }
    
        if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
            return err
        }
    
        // Install the version handler.
        // Add a handler at /<apiPrefix> to enumerate the supported api versions.
        s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
    
        return nil
    }
    
    func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
        ...
        for _, apiGroupInfo := range apiGroupInfos {
            if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
                return fmt.Errorf("unable to install api resources: %v", err)
            }
        ...
    }
    
    func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
        ...
        apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        ...
        r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
        ...
    }
    
    func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
        storage := make(map[string]rest.Storage)
        for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
            storage[strings.ToLower(k)] = v
        }
        version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
        version.Root = apiPrefix
        version.Storage = storage
        return version
    }
    

    k8s.io/apiserver/pkg/endpoints/groupversion.go中

    func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
        ...
        installer := &APIInstaller{
            group:             g,
            prefix:            prefix,
            minRequestTimeout: g.MinRequestTimeout,
        }
    
        apiResources, resourceInfos, ws, registrationErrors := installer.Install()
        ...
    }
    

    k8s.io/apiserver/pkg/endpoints/installer.go中

    func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
    
        ...
        apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
        ...
    }
    
    func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
        ...
        case "DELETE": // Delete a resource.
            ...
            handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
            handler = utilwarning.AddWarningsHandler(handler, warnings)
            route := ws.DELETE(action.Path).To(handler).
                    Doc(doc).
                    Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                    Operation("delete"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                    Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                    Writes(deleteReturnType).
                    Returns(http.StatusOK, "OK", deleteReturnType).
                    Returns(http.StatusAccepted, "Accepted", deleteReturnType)
            ...
        ...
    }
    
    func restfulDeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
        return func(req *restful.Request, res *restful.Response) {
            handlers.DeleteResource(r, allowsOptions, &scope, admit)(res.ResponseWriter, req.Request)
        }
    }
    
    func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
        ...
        result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
                obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
                wasDeleted = deleted
                return obj, err
            })
        ...
    }
    

    kubelet收到pod更新事件,发现Pod设置了DeletionTimestamp,会走删除逻辑,先执行prestop,然后停止容器,然后调用apiserver将GracePeriodSeconds设置为0表示这个pod可以删除了,apiserver执行删除

    问题

    这里面pod删除关注的使用方有endpointcontroller以及kubelet执行pod删除,也就是endpoint的更新和pod删除是并行执行的,那么有可能Pod删除了,endpoint任然没有更新,会影响kube-proxy和ingresscontroller的使用,就算是endpoint更新了,kube-proxy和ingresscontroller收到事件并且处理完,还是可能在pod删除之后,会导致流量损失

    解决方式

    方案一
    加入prestop,如sleep 5秒,但是只能减少这个问题出现的可能性,没有从根本上解决问题
    方案二
    使用admissionwebhook,在删除pod时候,做一些清理,比如停止网关把流量转发到此pod
    方案三
    使用download api注入podip环境变量,prestop通知网关此podip要下线

    相关文章

      网友评论

          本文标题:如何优雅滚动更新Pod

          本文链接:https://www.haomeiwen.com/subject/vhuzgltx.html