背景
一
触发滚动更新时候,会删除pod,会访问apiserver的接口,此时会设置tDeletionTimestamp
相关代码如下
k8s.io/apiserver/pkg/storage/etcd3/store.go
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return result
}
func (s *store) Delete(
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
key = path.Join(s.pathPrefix, key)
return s.conditionalDelete(ctx, key, out, v, preconditions, validateDeletion, cachedExistingObject)
}
func (s *store) conditionalDelete(
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return nil, err
}
return s.getState(getResp, key, v, false)
}
var origState *objState
var err error
var origStateIsCurrent bool
if cachedExistingObject != nil {
origState, err = s.getStateFromObject(cachedExistingObject)
} else {
origState, err = getCurrentState()
origStateIsCurrent = true
}
if err != nil {
return err
}
for {
if preconditions != nil {
if err := preconditions.Check(key, origState.obj); err != nil {
if origStateIsCurrent {
return err
}
// It's possible we're working with stale data.
// Remember the revision of the potentially stale data and the resulting update error
cachedRev := origState.rev
cachedUpdateErr := err
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
origStateIsCurrent = true
// it turns out our cached data was not stale, return the error
if cachedRev == origState.rev {
return cachedUpdateErr
}
// Retry
continue
}
}
if err := validateDeletion(ctx, origState.obj); err != nil {
if origStateIsCurrent {
return err
}
// It's possible we're working with stale data.
// Remember the revision of the potentially stale data and the resulting update error
cachedRev := origState.rev
cachedUpdateErr := err
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
origStateIsCurrent = true
// it turns out our cached data was not stale, return the error
if cachedRev == origState.rev {
return cachedUpdateErr
}
// Retry
continue
}
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(getResp, key, v, false)
if err != nil {
return err
}
origStateIsCurrent = true
continue
}
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil {
return nil, nil, err
}
client, err := newETCD3Client(c.Transport)
if err != nil {
stopCompactor()
return nil, nil, err
}
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
if err != nil {
return nil, nil, err
}
var once sync.Once
destroyFunc := func() {
// we know that storage destroy funcs are called multiple times (due to reuse in subresources).
// Hence, we only destroy once.
// TODO: fix duplicated storage destroy calls higher level
once.Do(func() {
stopCompactor()
stopDBSizeMonitor()
client.Close()
})
}
transformer := c.Transformer
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}
k8s.io/apiserver/pkg/storage/cacher/cacher.go
func NewCacherFromConfig(config Config) (*Cacher, error) {
...
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: objType,
versioner: config.Versioner,
newFunc: config.NewFunc,
indexedTrigger: indexedTrigger,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
dispatchTimeoutBudget: newTimeBudget(),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
clock: config.Clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
}
...
return cacher, nil
}
func (c *Cacher) Delete(
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, _ runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
}
k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go
func StorageWithCacher() generic.StorageDecorator {
return func(
storageConfig *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig, newFunc)
if err != nil {
return s, d, err
}
if klog.V(5).Enabled() {
klog.InfoS("Storage caching is enabled", objectTypeToArgs(newFunc())...)
}
cacherConfig := cacherstorage.Config{
Storage: s,
Versioner: etcd3.APIObjectVersioner{},
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
IndexerFuncs: triggerFuncs,
Indexers: indexers,
Codec: storageConfig.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
if err != nil {
return nil, func() {}, err
}
destroyFunc := func() {
cacher.Stop()
d()
}
// TODO : Remove RegisterStorageCleanup below when PR
// https://github.com/kubernetes/kubernetes/pull/50690
// merges as that shuts down storage properly
RegisterStorageCleanup(destroyFunc)
return cacher, destroyFunc, nil
}
}
k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c, newFunc)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
func UndecoratedStorage(
config *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config, newFunc)
}
func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc)
}
k8s.io/apiserver/pkg/server/options/etcd.go
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
size, ok := sizes[resource]
if ok && size > 0 {
klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
}
if ok && size <= 0 {
ret.Decorator = generic.UndecoratedStorage
} else {
ret.Decorator = genericregistry.StorageWithCacher()
}
}
return ret, nil
}
cmd/kube-apiserver/app/server.go
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
...
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
...
}
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
...
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
if err != nil {
return nil, nil, nil, err
}
...
config := &controlplane.Config{
GenericConfig: genericConfig,
...
}
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
...
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
...
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
...
}
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
return kubeAPIServer, nil
}
pkg/controlplane/instance.go中
func (c *Config) Complete() CompletedConfig {
cfg := completedConfig{
c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
&c.ExtraConfig,
}
serviceIPRange, apiServerServiceIP, err := ServiceIPRange(cfg.ExtraConfig.ServiceIPRange)
if err != nil {
klog.Fatalf("Error determining service IP ranges: %v", err)
}
if cfg.ExtraConfig.ServiceIPRange.IP == nil {
cfg.ExtraConfig.ServiceIPRange = serviceIPRange
}
if cfg.ExtraConfig.APIServerServiceIP == nil {
cfg.ExtraConfig.APIServerServiceIP = apiServerServiceIP
}
discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.GenericConfig.ExternalAddress}
discoveryAddresses.CIDRRules = append(discoveryAddresses.CIDRRules,
discovery.CIDRRule{IPRange: cfg.ExtraConfig.ServiceIPRange, Address: net.JoinHostPort(cfg.ExtraConfig.APIServerServiceIP.String(), strconv.Itoa(cfg.ExtraConfig.APIServerServicePort))})
cfg.GenericConfig.DiscoveryAddresses = discoveryAddresses
if cfg.ExtraConfig.ServiceNodePortRange.Size == 0 {
// TODO: Currently no way to specify an empty range (do we need to allow this?)
// We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
// but then that breaks the strict nestedness of ServiceType.
// Review post-v1
cfg.ExtraConfig.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.ExtraConfig.ServiceNodePortRange)
}
if cfg.ExtraConfig.EndpointReconcilerConfig.Interval == 0 {
cfg.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
}
if cfg.ExtraConfig.MasterEndpointReconcileTTL == 0 {
cfg.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
}
if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
}
return CompletedConfig{&cfg}
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
...
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...
}
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("error building core storage: %v", err)
}
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
...
for _, restStorageBuilder := range restStorageProviders {
...
apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
}
...
apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
}
if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
...
}
pkg/registry/core/rest/storage_core.go中
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
...
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
...
}
pkg/registry/core/pod/storage/storage.go
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
PredicateFunc: registrypod.MatchPod,
DefaultQualifiedResource: api.Resource("pods"),
CreateStrategy: registrypod.Strategy,
UpdateStrategy: registrypod.Strategy,
DeleteStrategy: registrypod.Strategy,
ResetFieldsStrategy: registrypod.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: registrypod.GetAttrs,
TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
Indexers: registrypod.Indexers(),
}
if err := store.CompleteWithOptions(options); err != nil {
return PodStorage{}, err
}
...
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
LegacyBinding: &LegacyBindingREST{bindingREST},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}, nil
}
k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
...
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
var err error
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
if err != nil {
return err
}
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
}
}
}
...
}
func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
...
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
if err != nil {
return nil, false, err
}
...
if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil); err != nil {
// Please refer to the place where we set ignoreNotFound for the reason
// why we ignore the NotFound error .
if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
// The lastExisting object may not be the last state of the object
// before its deletion, but it's the best approximation.
out, err := e.finalizeDelete(ctx, lastExisting, true, options)
return out, true, err
}
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
}
...
}
k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool, cachedExistingObject runtime.Object) error {
if dryRun {
if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return err
}
if err := preconditions.Check(key, out); err != nil {
return err
}
return deleteValidation(ctx, out)
}
return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation, cachedExistingObject)
}
k8s.io/apiserver/pkg/registry/rest/delete.go
func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
...
if objectMeta.GetDeletionTimestamp() != nil {
// if we are already being deleted, we may only shorten the deletion grace period
// this means the object was gracefully deleted previously but deletionGracePeriodSeconds was not set,
// so we force deletion immediately
// IMPORTANT:
// The deletion operation happens in two phases.
// 1. Update to set DeletionGracePeriodSeconds and DeletionTimestamp
// 2. Delete the object from storage.
// If the update succeeds, but the delete fails (network error, internal storage error, etc.),
// a resource was previously left in a state that was non-recoverable. We
// check if the existing stored resource has a grace period as 0 and if so
// attempt to delete immediately in order to recover from this scenario.
if objectMeta.GetDeletionGracePeriodSeconds() == nil || *objectMeta.GetDeletionGracePeriodSeconds() == 0 {
return false, false, nil
}
...
objectMeta.SetDeletionTimestamp(&now)
objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)
...
}
k8s.io/apiserver/pkg/server/genericapiserver.go中
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
}
openAPIModels, err := s.getOpenAPIModels(apiPrefix, apiGroupInfo)
if err != nil {
return fmt.Errorf("unable to get openapi models: %v", err)
}
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}
// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
return nil
}
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
...
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
...
}
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
...
apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
...
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
...
}
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
storage := make(map[string]rest.Storage)
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
storage[strings.ToLower(k)] = v
}
version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
version.Root = apiPrefix
version.Storage = storage
return version
}
k8s.io/apiserver/pkg/endpoints/groupversion.go中
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
...
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
...
}
k8s.io/apiserver/pkg/endpoints/installer.go中
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
...
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
}
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
case "DELETE": // Delete a resource.
...
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
handler = utilwarning.AddWarningsHandler(handler, warnings)
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("delete"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Writes(deleteReturnType).
Returns(http.StatusOK, "OK", deleteReturnType).
Returns(http.StatusAccepted, "Accepted", deleteReturnType)
...
...
}
func restfulDeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.DeleteResource(r, allowsOptions, &scope, admit)(res.ResponseWriter, req.Request)
}
}
func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
...
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
wasDeleted = deleted
return obj, err
})
...
}
二
kubelet收到pod更新事件,发现Pod设置了DeletionTimestamp,会走删除逻辑,先执行prestop,然后停止容器,然后调用apiserver将GracePeriodSeconds设置为0表示这个pod可以删除了,apiserver执行删除
问题
这里面pod删除关注的使用方有endpointcontroller以及kubelet执行pod删除,也就是endpoint的更新和pod删除是并行执行的,那么有可能Pod删除了,endpoint任然没有更新,会影响kube-proxy和ingresscontroller的使用,就算是endpoint更新了,kube-proxy和ingresscontroller收到事件并且处理完,还是可能在pod删除之后,会导致流量损失
解决方式
方案一
加入prestop,如sleep 5秒,但是只能减少这个问题出现的可能性,没有从根本上解决问题
方案二
使用admissionwebhook,在删除pod时候,做一些清理,比如停止网关把流量转发到此pod
方案三
使用download api注入podip环境变量,prestop通知网关此podip要下线
网友评论