美文网首页
k8s 之 apiserver 源码简单分析

k8s 之 apiserver 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-16 11:40 被阅读0次

    cmd/kube-apiserver/apiserver.go 中

    func main() {
        ...
        command := app.NewAPIServerCommand()
        ...
        if err := command.Execute(); err != nil {
            os.Exit(1)
        }
    

    cmd/kube-apiserver/app/server.go 中

    func NewAPIServerCommand() *cobra.Command {
        s := options.NewServerRunOptions()
        cmd := &cobra.Command{
            Use: "kube-apiserver",
            Long: `The Kubernetes API server validates and configures data
    for the api objects which include pods, services, replicationcontrollers, and
    others. The API Server services REST operations and provides the frontend to the
    cluster's shared state through which all other components interact.`,
    
            // stop printing usage when the command errors
            SilenceUsage: true,
            PersistentPreRunE: func(*cobra.Command, []string) error {
                // silence client-go warnings.
                // kube-apiserver loopback clients should not log self-issued warnings.
                rest.SetDefaultWarningHandler(rest.NoWarnings{})
                return nil
            },
            RunE: func(cmd *cobra.Command, args []string) error {
                verflag.PrintAndExitIfRequested()
                cliflag.PrintFlags(cmd.Flags())
    
                // set default options
                completedOptions, err := Complete(s)
                if err != nil {
                    return err
                }
    
                // validate options
                if errs := completedOptions.Validate(); len(errs) != 0 {
                    return utilerrors.NewAggregate(errs)
                }
    
                return Run(completedOptions, genericapiserver.SetupSignalHandler())
            },
            Args: func(cmd *cobra.Command, args []string) error {
                for _, arg := range args {
                    if len(arg) > 0 {
                        return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
                    }
                }
                return nil
            },
        }
        ...
    }
    
    func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
        // To help debugging, immediately log version
        klog.Infof("Version: %+v", version.Get())
    
        server, err := CreateServerChain(completeOptions, stopCh)
        if err != nil {
            return err
        }
    
        prepared, err := server.PrepareRun()
        if err != nil {
            return err
        }
    
        return prepared.Run(stopCh)
    }
    
    func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
        nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
        if err != nil {
            return nil, err
        }
    
        kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
        if err != nil {
            return nil, err
        }
    
        // If additional API servers are added, they should be gated.
        apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
            serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
        if err != nil {
            return nil, err
        }
        apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
        if err != nil {
            return nil, err
        }
    
        kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
        if err != nil {
            return nil, err
        }
    
        // aggregator comes last in the chain
        aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
        if err != nil {
            return nil, err
        }
        aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
        if err != nil {
            // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
            return nil, err
        }
    
        if insecureServingInfo != nil {
            insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
            if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
                return nil, err
            }
        }
    
        return aggregatorServer, nil
    }
    
    func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
        kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
        if err != nil {
            return nil, err
        }
    
        return kubeAPIServer, nil
    }
    

    pkg/master/master.go 中

    func (c *Config) Complete() CompletedConfig {
        cfg := completedConfig{
            c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
            &c.ExtraConfig,
        }
    
        serviceIPRange, apiServerServiceIP, err := ServiceIPRange(cfg.ExtraConfig.ServiceIPRange)
        if err != nil {
            klog.Fatalf("Error determining service IP ranges: %v", err)
        }
        if cfg.ExtraConfig.ServiceIPRange.IP == nil {
            cfg.ExtraConfig.ServiceIPRange = serviceIPRange
        }
        if cfg.ExtraConfig.APIServerServiceIP == nil {
            cfg.ExtraConfig.APIServerServiceIP = apiServerServiceIP
        }
    
        discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.GenericConfig.ExternalAddress}
        discoveryAddresses.CIDRRules = append(discoveryAddresses.CIDRRules,
            discovery.CIDRRule{IPRange: cfg.ExtraConfig.ServiceIPRange, Address: net.JoinHostPort(cfg.ExtraConfig.APIServerServiceIP.String(), strconv.Itoa(cfg.ExtraConfig.APIServerServicePort))})
        cfg.GenericConfig.DiscoveryAddresses = discoveryAddresses
    
        if cfg.ExtraConfig.ServiceNodePortRange.Size == 0 {
            // TODO: Currently no way to specify an empty range (do we need to allow this?)
            // We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
            // but then that breaks the strict nestedness of ServiceType.
            // Review post-v1
            cfg.ExtraConfig.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
            klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.ExtraConfig.ServiceNodePortRange)
        }
    
        if cfg.ExtraConfig.EndpointReconcilerConfig.Interval == 0 {
            cfg.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
        }
    
        if cfg.ExtraConfig.MasterEndpointReconcileTTL == 0 {
            cfg.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
        }
    
        if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
            cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
        }
    
        return CompletedConfig{&cfg}
    }
    
    func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
        if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
            return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
        }
    
        s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
        if err != nil {
            return nil, err
        }
    
        if c.ExtraConfig.EnableLogsSupport {
            routes.Logs{}.Install(s.Handler.GoRestfulContainer)
        }
    
        if utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountIssuerDiscovery) {
            // Metadata and keys are expected to only change across restarts at present,
            // so we just marshal immediately and serve the cached JSON bytes.
            md, err := serviceaccount.NewOpenIDMetadata(
                c.ExtraConfig.ServiceAccountIssuerURL,
                c.ExtraConfig.ServiceAccountJWKSURI,
                c.GenericConfig.ExternalAddress,
                c.ExtraConfig.ServiceAccountPublicKeys,
            )
            if err != nil {
                // If there was an error, skip installing the endpoints and log the
                // error, but continue on. We don't return the error because the
                // metadata responses require additional, backwards incompatible
                // validation of command-line options.
                msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
                    " ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
                    " enabled. Error: %v", err)
                if c.ExtraConfig.ServiceAccountIssuerURL != "" {
                    // The user likely expects this feature to be enabled if issuer URL is
                    // set and the feature gate is enabled. In the future, if there is no
                    // longer a feature gate and issuer URL is not set, the user may not
                    // expect this feature to be enabled. We log the former case as an Error
                    // and the latter case as an Info.
                    klog.Error(msg)
                } else {
                    klog.Info(msg)
                }
            } else {
                routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
                    Install(s.Handler.GoRestfulContainer)
            }
        }
    
        m := &Master{
            GenericAPIServer:          s,
            ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
        }
    
        // install legacy rest storage
        if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
            legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
                StorageFactory:              c.ExtraConfig.StorageFactory,
                ProxyTransport:              c.ExtraConfig.ProxyTransport,
                KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
                EventTTL:                    c.ExtraConfig.EventTTL,
                ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
                SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
                ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
                LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
                ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
                ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
                ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
                APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
            }
            if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
                return nil, err
            }
        }
    
        // The order here is preserved in discovery.
        // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
        // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
        // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
        // with specific priorities.
        // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
        // handlers that we have.
        restStorageProviders := []RESTStorageProvider{
            authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
            authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
            autoscalingrest.RESTStorageProvider{},
            batchrest.RESTStorageProvider{},
            certificatesrest.RESTStorageProvider{},
            coordinationrest.RESTStorageProvider{},
            discoveryrest.StorageProvider{},
            extensionsrest.RESTStorageProvider{},
            networkingrest.RESTStorageProvider{},
            noderest.RESTStorageProvider{},
            policyrest.RESTStorageProvider{},
            rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
            schedulingrest.RESTStorageProvider{},
            settingsrest.RESTStorageProvider{},
            storagerest.RESTStorageProvider{},
            flowcontrolrest.RESTStorageProvider{},
            // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
            // See https://github.com/kubernetes/kubernetes/issues/42392
            appsrest.StorageProvider{},
            admissionregistrationrest.RESTStorageProvider{},
            eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
        }
        if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
            return nil, err
        }
    
        if c.ExtraConfig.Tunneler != nil {
            m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
        }
    
        m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
            kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
            if err != nil {
                return err
            }
            controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
    
            // prime values and start listeners
            if m.ClusterAuthenticationInfo.ClientCA != nil {
                if notifier, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.Notifier); ok {
                    notifier.AddListener(controller)
                }
                if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
                    // runonce to be sure that we have a value.
                    if err := controller.RunOnce(); err != nil {
                        runtime.HandleError(err)
                    }
                    go controller.Run(1, hookContext.StopCh)
                }
            }
            if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
                if notifier, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.Notifier); ok {
                    notifier.AddListener(controller)
                }
                if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
                    // runonce to be sure that we have a value.
                    if err := controller.RunOnce(); err != nil {
                        runtime.HandleError(err)
                    }
                    go controller.Run(1, hookContext.StopCh)
                }
            }
    
            go controller.Run(1, hookContext.StopCh)
            return nil
        })
    
        return m, nil
    }
    
    func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
        apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
    
        for _, restStorageBuilder := range restStorageProviders {
            groupName := restStorageBuilder.GroupName()
            if !apiResourceConfigSource.AnyVersionForGroupEnabled(groupName) {
                klog.V(1).Infof("Skipping disabled API group %q.", groupName)
                continue
            }
            apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
            if err != nil {
                return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
            }
            if !enabled {
                klog.Warningf("API group %q is not enabled, skipping.", groupName)
                continue
            }
            klog.V(1).Infof("Enabling API group %q.", groupName)
    
            if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
                name, hook, err := postHookProvider.PostStartHook()
                if err != nil {
                    klog.Fatalf("Error building PostStartHook: %v", err)
                }
                m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
            }
    
            apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
        }
    
        if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
            return fmt.Errorf("error in registering group versions: %v", err)
        }
        return nil
    }
    
    
    func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
        legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
        if err != nil {
            return fmt.Errorf("error building core storage: %v", err)
        }
    
        controllerName := "bootstrap-controller"
        coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
        bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
        m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
        m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
    
        if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
            return fmt.Errorf("error in registering group versions: %v", err)
        }
        return nil
    }
    

    k8s.io/apiserver/pkg/server/genericapiserver.go 中

    func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
        if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
            return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
        }
    
        openAPIModels, err := s.getOpenAPIModels(apiPrefix, apiGroupInfo)
        if err != nil {
            return fmt.Errorf("unable to get openapi models: %v", err)
        }
    
        if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
            return err
        }
    
        // Install the version handler.
        // Add a handler at /<apiPrefix> to enumerate the supported api versions.
        s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
    
        return nil
    }
    
    func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
        for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
            if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
                klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
                continue
            }
    
            apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
            if apiGroupInfo.OptionsExternalVersion != nil {
                apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
            }
            apiGroupVersion.OpenAPIModels = openAPIModels
            apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
    
            if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
                return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
            }
        }
    
        return nil
    }
    

    k8s.io/apiserver/pkg/endpoints/groupversion.go 中

    func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
        prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
        installer := &APIInstaller{
            group:             g,
            prefix:            prefix,
            minRequestTimeout: g.MinRequestTimeout,
        }
    
        apiResources, ws, registrationErrors := installer.Install()
        versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
        versionDiscoveryHandler.AddToWebService(ws)
        container.Add(ws)
        return utilerrors.NewAggregate(registrationErrors)
    }
    

    k8s.io/apiserver/pkg/endpoints/installer.go 中

    func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
        var apiResources []metav1.APIResource
        var errors []error
        ws := a.newWebService()
    
        // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
        paths := make([]string, len(a.group.Storage))
        var i int = 0
        for path := range a.group.Storage {
            paths[i] = path
            i++
        }
        sort.Strings(paths)
        for _, path := range paths {
            apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
            if err != nil {
                errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
            }
            if apiResource != nil {
                apiResources = append(apiResources, *apiResource)
            }
        }
        return apiResources, ws, errors
    }
    
    func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
        ...
        creater, isCreater := storage.(rest.Creater)
        ...
            case "POST": // Create a resource.
                var handler restful.RouteFunction
                if isNamedCreater {
                    handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
                } else {
                    handler = restfulCreateResource(creater, reqScope, admit)
                }
                handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
                if enableWarningHeaders {
                    handler = utilwarning.AddWarningsHandler(handler, warnings)
                }
                article := GetArticleForNoun(kind, " ")
                doc := "create" + article + kind
                if isSubresource {
                    doc = "create " + subresource + " of" + article + kind
                }
                route := ws.POST(action.Path).To(handler).
                    Doc(doc).
                    Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                    Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                    Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                    Returns(http.StatusOK, "OK", producedObject).
                    // TODO: in some cases, the API may return a v1.Status instead of the versioned object
                    // but currently go-restful can't handle multiple different objects being returned.
                    Returns(http.StatusCreated, "Created", producedObject).
                    Returns(http.StatusAccepted, "Accepted", producedObject).
                    Reads(defaultVersionedObject).
                    Writes(producedObject)
                if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
                    return nil, err
                }
                addParams(route, action.Params)
                routes = append(routes, route)
            for _, route := range routes {
                route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
                    Group:   reqScope.Kind.Group,
                    Version: reqScope.Kind.Version,
                    Kind:    reqScope.Kind.Kind,
                })
                route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
                ws.Route(route)
            }
        ...
    }
    
    func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
        return func(req *restful.Request, res *restful.Response) {
            handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
        }
    }
    

    pkg/registry/core/rest/storage_core.go 中

    func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
        apiGroupInfo := genericapiserver.APIGroupInfo{
            PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
            VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
            Scheme:                       legacyscheme.Scheme,
            ParameterCodec:               legacyscheme.ParameterCodec,
            NegotiatedSerializer:         legacyscheme.Codecs,
        }
    
        var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
        if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.IsVersionRegistered(policyGroupVersion) {
            var err error
            podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
            if err != nil {
                return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
            }
        }
        restStorage := LegacyRESTStorage{}
    
        podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
        limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
        secretStorage, err := secretstore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
        persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
        persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
        configMapStorage, err := configmapstore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        podStorage, err := podstore.NewStorage(
            restOptionsGetter,
            nodeStorage.KubeletConnectionInfo,
            c.ProxyTransport,
            podDisruptionClient,
        )
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        var serviceAccountStorage *serviceaccountstore.REST
        if c.ServiceAccountIssuer != nil && utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) {
            serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, secretStorage.Store, c.ExtendExpiration)
        } else {
            serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, 0, nil, nil, false)
        }
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        var serviceClusterIPRegistry rangeallocation.RangeRegistry
        serviceClusterIPRange := c.ServiceIPRange
        if serviceClusterIPRange.IP == nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing")
        }
    
        serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
            mem := allocator.NewAllocationMap(max, rangeSpec)
            // TODO etcdallocator package to return a storage interface via the storageFactory
            etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
            if err != nil {
                return nil, err
            }
            serviceClusterIPRegistry = etcd
            return etcd, nil
        })
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
        }
        restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
    
        // allocator for secondary service ip range
        var secondaryServiceClusterIPAllocator ipallocator.Interface
        if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
            var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
            secondaryServiceClusterIPAllocator, err = ipallocator.NewAllocatorCIDRRange(&c.SecondaryServiceIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
                mem := allocator.NewAllocationMap(max, rangeSpec)
                // TODO etcdallocator package to return a storage interface via the storageFactory
                etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
                if err != nil {
                    return nil, err
                }
                secondaryServiceClusterIPRegistry = etcd
                return etcd, nil
            })
            if err != nil {
                return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
            }
            restStorage.SecondaryServiceClusterIPAllocator = secondaryServiceClusterIPRegistry
        }
    
        var serviceNodePortRegistry rangeallocation.RangeRegistry
        serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
            mem := allocator.NewAllocationMap(max, rangeSpec)
            // TODO etcdallocator package to return a storage interface via the storageFactory
            etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
            if err != nil {
                return nil, err
            }
            serviceNodePortRegistry = etcd
            return etcd, nil
        })
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
        }
        restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
    
        controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage,
            endpointsStorage,
            podStorage.Pod,
            serviceClusterIPAllocator,
            secondaryServiceClusterIPAllocator,
            serviceNodePortAllocator,
            c.ProxyTransport)
    
        restStorageMap := map[string]rest.Storage{
            "pods":             podStorage.Pod,
            "pods/attach":      podStorage.Attach,
            "pods/status":      podStorage.Status,
            "pods/log":         podStorage.Log,
            "pods/exec":        podStorage.Exec,
            "pods/portforward": podStorage.PortForward,
            "pods/proxy":       podStorage.Proxy,
            "pods/binding":     podStorage.Binding,
            "bindings":         podStorage.LegacyBinding,
    
            "podTemplates": podTemplateStorage,
    
            "replicationControllers":        controllerStorage.Controller,
            "replicationControllers/status": controllerStorage.Status,
    
            "services":        serviceRest,
            "services/proxy":  serviceRestProxy,
            "services/status": serviceStatusStorage,
    
            "endpoints": endpointsStorage,
    
            "nodes":        nodeStorage.Node,
            "nodes/status": nodeStorage.Status,
            "nodes/proxy":  nodeStorage.Proxy,
    
            "events": eventStorage,
    
            "limitRanges":                   limitRangeStorage,
            "resourceQuotas":                resourceQuotaStorage,
            "resourceQuotas/status":         resourceQuotaStatusStorage,
            "namespaces":                    namespaceStorage,
            "namespaces/status":             namespaceStatusStorage,
            "namespaces/finalize":           namespaceFinalizeStorage,
            "secrets":                       secretStorage,
            "serviceAccounts":               serviceAccountStorage,
            "persistentVolumes":             persistentVolumeStorage,
            "persistentVolumes/status":      persistentVolumeStatusStorage,
            "persistentVolumeClaims":        persistentVolumeClaimStorage,
            "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
            "configMaps":                    configMapStorage,
    
            "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
        }
        if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
            restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
        }
        if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
            restStorageMap["pods/eviction"] = podStorage.Eviction
        }
        if serviceAccountStorage.Token != nil {
            restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
        }
        if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
            restStorageMap["pods/ephemeralcontainers"] = podStorage.EphemeralContainers
        }
        apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
    
        return restStorage, apiGroupInfo, nil
    }
    

    k8s.io/apiserver/pkg/endpoints/handlers/create.go 中

    func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
        return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
    }
    
    func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
        return func(w http.ResponseWriter, req *http.Request) {
            // For performance tracking purposes.
            trace := utiltrace.New("Create", utiltrace.Field{Key: "url", Value: req.URL.Path}, utiltrace.Field{Key: "user-agent", Value: &lazyTruncatedUserAgent{req}}, utiltrace.Field{Key: "client", Value: &lazyClientIP{req}})
            defer trace.LogIfLong(500 * time.Millisecond)
    
            if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
                scope.err(errors.NewBadRequest("the dryRun feature is disabled"), w, req)
                return
            }
    
            // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
            timeout := parseTimeout(req.URL.Query().Get("timeout"))
    
            namespace, name, err := scope.Namer.Name(req)
            if err != nil {
                if includeName {
                    // name was required, return
                    scope.err(err, w, req)
                    return
                }
    
                // otherwise attempt to look up the namespace
                namespace, err = scope.Namer.Namespace(req)
                if err != nil {
                    scope.err(err, w, req)
                    return
                }
            }
    
            ctx, cancel := context.WithTimeout(req.Context(), timeout)
            defer cancel()
            ctx = request.WithNamespace(ctx, namespace)
            outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
            if err != nil {
                scope.err(err, w, req)
                return
            }
    
            gv := scope.Kind.GroupVersion()
            s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
            if err != nil {
                scope.err(err, w, req)
                return
            }
    
            decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
    
            body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
            if err != nil {
                scope.err(err, w, req)
                return
            }
    
            options := &metav1.CreateOptions{}
            values := req.URL.Query()
            if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
                err = errors.NewBadRequest(err.Error())
                scope.err(err, w, req)
                return
            }
            if errs := validation.ValidateCreateOptions(options); len(errs) > 0 {
                err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "CreateOptions"}, "", errs)
                scope.err(err, w, req)
                return
            }
            options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))
    
            defaultGVK := scope.Kind
            original := r.New()
            trace.Step("About to convert to expected version")
            obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
            if err != nil {
                err = transformDecodeError(scope.Typer, err, original, gvk, body)
                scope.err(err, w, req)
                return
            }
            if gvk.GroupVersion() != gv {
                err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%v)", gvk.GroupVersion().String(), gv.String()))
                scope.err(err, w, req)
                return
            }
            trace.Step("Conversion done")
    
            ae := request.AuditEventFrom(ctx)
            admit = admission.WithAudit(admit, ae)
            audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)
    
            userInfo, _ := request.UserFrom(ctx)
    
            // On create, get name from new object if unset
            if len(name) == 0 {
                _, name, _ = scope.Namer.ObjectName(obj)
            }
    
            trace.Step("About to store object in database")
            admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
            requestFunc := func() (runtime.Object, error) {
                return r.Create(
                    ctx,
                    name,
                    obj,
                    rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
                    options,
                )
            }
            result, err := finishRequest(timeout, func() (runtime.Object, error) {
                if scope.FieldManager != nil {
                    liveObj, err := scope.Creater.New(scope.Kind)
                    if err != nil {
                        return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
                    }
                    obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
                }
                if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
                    if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
                        return nil, err
                    }
                }
                result, err := requestFunc()
                // If the object wasn't committed to storage because it's serialized size was too large,
                // it is safe to remove managedFields (which can be large) and try again.
                if isTooLargeError(err) {
                    if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
                        accessor.SetManagedFields(nil)
                        result, err = requestFunc()
                    }
                }
                return result, err
            })
            if err != nil {
                scope.err(err, w, req)
                return
            }
            trace.Step("Object stored in database")
    
            code := http.StatusCreated
            status, ok := result.(*metav1.Status)
            if ok && err == nil && status.Code == 0 {
                status.Code = int32(code)
            }
    
            transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
        }
    }
    

    pkg/registry/core/pod/storage/storage.go中

    func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
    
        store := &genericregistry.Store{
            NewFunc:                  func() runtime.Object { return &api.Pod{} },
            NewListFunc:              func() runtime.Object { return &api.PodList{} },
            PredicateFunc:            registrypod.MatchPod,
            DefaultQualifiedResource: api.Resource("pods"),
    
            CreateStrategy:      registrypod.Strategy,
            UpdateStrategy:      registrypod.Strategy,
            DeleteStrategy:      registrypod.Strategy,
            ReturnDeletedObject: true,
    
            TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
        }
        options := &generic.StoreOptions{
            RESTOptions: optsGetter,
            AttrFunc:    registrypod.GetAttrs,
            TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
            Indexers:    registrypod.Indexers(),
        }
        if err := store.CompleteWithOptions(options); err != nil {
            return PodStorage{}, err
        }
    
        statusStore := *store
        statusStore.UpdateStrategy = registrypod.StatusStrategy
        ephemeralContainersStore := *store
        ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy
    
        bindingREST := &BindingREST{store: store}
        return PodStorage{
            Pod:                 &REST{store, proxyTransport},
            Binding:             &BindingREST{store: store},
            LegacyBinding:       &LegacyBindingREST{bindingREST},
            Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
            Status:              &StatusREST{store: &statusStore},
            EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
            Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
            Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
            Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
            Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
            PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
        }, nil
    }
    

    k8s.io/apiserver/pkg/registry/generic/registry/store.go中

    func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
        if e.DefaultQualifiedResource.Empty() {
            return fmt.Errorf("store %#v must have a non-empty qualified resource", e)
        }
        if e.NewFunc == nil {
            return fmt.Errorf("store for %s must have NewFunc set", e.DefaultQualifiedResource.String())
        }
        if e.NewListFunc == nil {
            return fmt.Errorf("store for %s must have NewListFunc set", e.DefaultQualifiedResource.String())
        }
        if (e.KeyRootFunc == nil) != (e.KeyFunc == nil) {
            return fmt.Errorf("store for %s must set both KeyRootFunc and KeyFunc or neither", e.DefaultQualifiedResource.String())
        }
    
        if e.TableConvertor == nil {
            return fmt.Errorf("store for %s must set TableConvertor; rest.NewDefaultTableConvertor(e.DefaultQualifiedResource) can be used to output just name/creation time", e.DefaultQualifiedResource.String())
        }
    
        var isNamespaced bool
        switch {
        case e.CreateStrategy != nil:
            isNamespaced = e.CreateStrategy.NamespaceScoped()
        case e.UpdateStrategy != nil:
            isNamespaced = e.UpdateStrategy.NamespaceScoped()
        default:
            return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
        }
    
        if e.DeleteStrategy == nil {
            return fmt.Errorf("store for %s must have DeleteStrategy set", e.DefaultQualifiedResource.String())
        }
    
        if options.RESTOptions == nil {
            return fmt.Errorf("options for %s must have RESTOptions set", e.DefaultQualifiedResource.String())
        }
    
        attrFunc := options.AttrFunc
        if attrFunc == nil {
            if isNamespaced {
                attrFunc = storage.DefaultNamespaceScopedAttr
            } else {
                attrFunc = storage.DefaultClusterScopedAttr
            }
        }
        if e.PredicateFunc == nil {
            e.PredicateFunc = func(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
                return storage.SelectionPredicate{
                    Label:    label,
                    Field:    field,
                    GetAttrs: attrFunc,
                }
            }
        }
    
        err := validateIndexers(options.Indexers)
        if err != nil {
            return err
        }
    
        opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
        if err != nil {
            return err
        }
    
        // ResourcePrefix must come from the underlying factory
        prefix := opts.ResourcePrefix
        if !strings.HasPrefix(prefix, "/") {
            prefix = "/" + prefix
        }
        if prefix == "/" {
            return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
        }
    
        // Set the default behavior for storage key generation
        if e.KeyRootFunc == nil && e.KeyFunc == nil {
            if isNamespaced {
                e.KeyRootFunc = func(ctx context.Context) string {
                    return NamespaceKeyRootFunc(ctx, prefix)
                }
                e.KeyFunc = func(ctx context.Context, name string) (string, error) {
                    return NamespaceKeyFunc(ctx, prefix, name)
                }
            } else {
                e.KeyRootFunc = func(ctx context.Context) string {
                    return prefix
                }
                e.KeyFunc = func(ctx context.Context, name string) (string, error) {
                    return NoNamespaceKeyFunc(ctx, prefix, name)
                }
            }
        }
    
        // We adapt the store's keyFunc so that we can use it with the StorageDecorator
        // without making any assumptions about where objects are stored in etcd
        keyFunc := func(obj runtime.Object) (string, error) {
            accessor, err := meta.Accessor(obj)
            if err != nil {
                return "", err
            }
    
            if isNamespaced {
                return e.KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName())
            }
    
            return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName())
        }
    
        if e.DeleteCollectionWorkers == 0 {
            e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
        }
    
        e.EnableGarbageCollection = opts.EnableGarbageCollection
    
        if e.ObjectNameFunc == nil {
            e.ObjectNameFunc = func(obj runtime.Object) (string, error) {
                accessor, err := meta.Accessor(obj)
                if err != nil {
                    return "", err
                }
                return accessor.GetName(), nil
            }
        }
    
        if e.Storage.Storage == nil {
            e.Storage.Codec = opts.StorageConfig.Codec
            var err error
            e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
                opts.StorageConfig,
                prefix,
                keyFunc,
                e.NewFunc,
                e.NewListFunc,
                attrFunc,
                options.TriggerFunc,
                options.Indexers,
            )
            if err != nil {
                return err
            }
            e.StorageVersioner = opts.StorageConfig.EncodeVersioner
    
            if opts.CountMetricPollPeriod > 0 {
                stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
                previousDestroy := e.DestroyFunc
                e.DestroyFunc = func() {
                    stopFunc()
                    if previousDestroy != nil {
                        previousDestroy()
                    }
                }
            }
        }
    
        return nil
    }
    
    func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
        if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
            return nil, err
        }
        // at this point we have a fully formed object.  It is time to call the validators that the apiserver
        // handling chain wants to enforce.
        if createValidation != nil {
            if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
                return nil, err
            }
        }
    
        name, err := e.ObjectNameFunc(obj)
        if err != nil {
            return nil, err
        }
        key, err := e.KeyFunc(ctx, name)
        if err != nil {
            return nil, err
        }
        qualifiedResource := e.qualifiedResourceFromContext(ctx)
        ttl, err := e.calculateTTL(obj, 0, false)
        if err != nil {
            return nil, err
        }
        out := e.NewFunc()
        if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
            err = storeerr.InterpretCreateError(err, qualifiedResource, name)
            err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
            if !apierrors.IsAlreadyExists(err) {
                return nil, err
            }
            if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
                return nil, err
            }
            accessor, errGetAcc := meta.Accessor(out)
            if errGetAcc != nil {
                return nil, err
            }
            if accessor.GetDeletionTimestamp() != nil {
                msg := &err.(*apierrors.StatusError).ErrStatus.Message
                *msg = fmt.Sprintf("object is being deleted: %s", *msg)
            }
            return nil, err
        }
        if e.AfterCreate != nil {
            if err := e.AfterCreate(out); err != nil {
                return nil, err
            }
        }
        if e.Decorator != nil {
            if err := e.Decorator(out); err != nil {
                return nil, err
            }
        }
        return out, nil
    }
    

    k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go中

    func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error {
        if dryRun {
            if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil {
                return storage.NewKeyExistsError(key, 0)
            }
            return s.copyInto(obj, out)
        }
        return s.Storage.Create(ctx, key, obj, out, ttl)
    }
    

    cmd/kube-apiserver/app/aggregator.go

    func createAggregatorConfig(
        kubeAPIServerConfig genericapiserver.Config,
        commandOptions *options.ServerRunOptions,
        externalInformers kubeexternalinformers.SharedInformerFactory,
        serviceResolver aggregatorapiserver.ServiceResolver,
        proxyTransport *http.Transport,
        pluginInitializers []admission.PluginInitializer,
    ) (*aggregatorapiserver.Config, error) {
        // make a shallow copy to let us twiddle a few things
        // most of the config actually remains the same.  We only need to mess with a couple items related to the particulars of the aggregator
        genericConfig := kubeAPIServerConfig
        genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
        genericConfig.RESTOptionsGetter = nil
    
        // override genericConfig.AdmissionControl with kube-aggregator's scheme,
        // because aggregator apiserver should use its own scheme to convert its own resources.
        err := commandOptions.Admission.ApplyTo(
            &genericConfig,
            externalInformers,
            genericConfig.LoopbackClientConfig,
            feature.DefaultFeatureGate,
            pluginInitializers...)
        if err != nil {
            return nil, err
        }
    
        // copy the etcd options so we don't mutate originals.
        etcdOptions := *commandOptions.Etcd
        etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
        etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)
        etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
        genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
    
        // override MergedResourceConfig with aggregator defaults and registry
        if err := commandOptions.APIEnablement.ApplyTo(
            &genericConfig,
            aggregatorapiserver.DefaultAPIResourceConfigSource(),
            aggregatorscheme.Scheme); err != nil {
            return nil, err
        }
    
        var certBytes, keyBytes []byte
        if len(commandOptions.ProxyClientCertFile) > 0 && len(commandOptions.ProxyClientKeyFile) > 0 {
            certBytes, err = ioutil.ReadFile(commandOptions.ProxyClientCertFile)
            if err != nil {
                return nil, err
            }
            keyBytes, err = ioutil.ReadFile(commandOptions.ProxyClientKeyFile)
            if err != nil {
                return nil, err
            }
        }
    
        aggregatorConfig := &aggregatorapiserver.Config{
            GenericConfig: &genericapiserver.RecommendedConfig{
                Config:                genericConfig,
                SharedInformerFactory: externalInformers,
            },
            ExtraConfig: aggregatorapiserver.ExtraConfig{
                ProxyClientCert: certBytes,
                ProxyClientKey:  keyBytes,
                ServiceResolver: serviceResolver,
                ProxyTransport:  proxyTransport,
            },
        }
    
        // we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
        aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
    
        return aggregatorConfig, nil
    }
    

    k8s.io/apiserver/pkg/server/options/etcd.go 中

    func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
        ret := generic.RESTOptions{
            StorageConfig:           &f.Options.StorageConfig,
            Decorator:               generic.UndecoratedStorage,
            EnableGarbageCollection: f.Options.EnableGarbageCollection,
            DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
            ResourcePrefix:          resource.Group + "/" + resource.Resource,
            CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
        }
        if f.Options.EnableWatchCache {
            sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
            if err != nil {
                return generic.RESTOptions{}, err
            }
            size, ok := sizes[resource]
            if ok && size > 0 {
                klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
            }
            if ok && size <= 0 {
                ret.Decorator = generic.UndecoratedStorage
            } else {
                ret.Decorator = genericregistry.StorageWithCacher()
            }
        }
        return ret, nil
    }
    
    

    k8s.io/apiserver/pkg/registry/generic/storage_decorator.go 中

    func UndecoratedStorage(
        config *storagebackend.Config,
        resourcePrefix string,
        keyFunc func(obj runtime.Object) (string, error),
        newFunc func() runtime.Object,
        newListFunc func() runtime.Object,
        getAttrsFunc storage.AttrFunc,
        trigger storage.IndexerFuncs,
        indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
        return NewRawStorage(config)
    }
    
    func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
        return factory.Create(*config)
    }
    
    

    k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go 中

    func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
        switch c.Type {
        case "etcd2":
            return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
        case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
            return newETCD3Storage(c)
        default:
            return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
        }
    }
    

    k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go 中

    func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
        stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
        if err != nil {
            return nil, nil, err
        }
    
        client, err := newETCD3Client(c.Transport)
        if err != nil {
            stopCompactor()
            return nil, nil, err
        }
    
        stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
        if err != nil {
            return nil, nil, err
        }
    
        var once sync.Once
        destroyFunc := func() {
            // we know that storage destroy funcs are called multiple times (due to reuse in subresources).
            // Hence, we only destroy once.
            // TODO: fix duplicated storage destroy calls higher level
            once.Do(func() {
                stopCompactor()
                stopDBSizeMonitor()
                client.Close()
            })
        }
        transformer := c.Transformer
        if transformer == nil {
            transformer = value.IdentityTransformer
        }
        return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
    }
    
    

    相关文章

      网友评论

          本文标题:k8s 之 apiserver 源码简单分析

          本文链接:https://www.haomeiwen.com/subject/vrkphktx.html