美文网首页
Kubernetes 源码分析 -- API Server之Et

Kubernetes 源码分析 -- API Server之Et

作者: 何约什 | 来源:发表于2018-12-21 15:18 被阅读83次

    StorageEncodingOverridesKubernetes资源数据存储在ETCD中,存储的数据格式缺省为:application/json,版本使用__internal版本。
    具体见:MergeGroupEncodingConfig函数,所以从获取资源的时候,我们是不需要指定版本的。
    一般基于:schema.GroupResource类型去获取资源,如果要获取所有的资源,资源可以指定为“*”。

    对于每个资源类型,我们可以指定参数schema.GroupResource来基于DefaultStorageFactory来获取对应资源的存储接口。

    Etcd配置

    资源信息存储路径前缀缺省为:DefaultEtcdPathPrefix = "registry"
    但是这个参数我们可以在运行时指定参数覆盖,具体的参数配置为:etcd-prefix

    
    func NewServerRunOptions() *ServerRunOptions {
        s := ServerRunOptions{
            GenericServerRunOptions: genericoptions.NewServerRunOptions(),
            Etcd:                 genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
                    ......
    }
    
    Etcd的属性如下:
    
    func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
        return &EtcdOptions{
            StorageConfig:           *backendConfig,
            DefaultStorageMediaType: "application/json",
            DeleteCollectionWorkers: 1,
            EnableGarbageCollection: true,
            EnableWatchCache:        true,
            DefaultWatchCacheSize:   100,
        }
    }
    

    DefaultStorageFactory

    DefaultStorageFactory的主要作用是基于GroupResource,返回对应的存储接口。
    结果包括:

      1. 归并的etcd配置信息,包括:授权、服务器、前缀
      1. 存储的资源编码:group,version,kind的存储
      1. 共生情况:部分资源,例如hpa,被通过多个API暴漏。

    DefaultStorageFactory的构建方法为:

    func NewDefaultStorageFactory(
        config storagebackend.Config,
        defaultMediaType string,                                // 从EtcdOptions参数中传入的,缺省为 application/json,见NewEtcdOptions方法
        defaultSerializer runtime.StorageSerializer,    // 具体的值:legacyscheme.Codecs
        resourceEncodingConfig ResourceEncodingConfig,  // 资源编码配置情况,并不是所有的资源都按照指定的Group来存放,有些特例。另外也可以指定存储在不同etcd、不同的prefix、甚至于不同的编码存储。
        resourceConfig APIResourceConfigSource,  //  启用的资源版本的API情况
        specialDefaultResourcePrefixes map[schema.GroupResource]string,  // 见:SpecialDefaultResourcePrefixes
    ) *DefaultStorageFactory {
        config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
        if len(defaultMediaType) == 0 {
            defaultMediaType = runtime.ContentTypeJSON
        }
        return &DefaultStorageFactory{
            StorageConfig:           config, // 描述了如何创建到底层存储的连接,包含了各种存储接口storage.Interface实现的认证信息。
            Overrides:               map[schema.GroupResource]groupResourceOverrides{}, // 特殊资源处理
            DefaultMediaType:        defaultMediaType,  // 缺省存储媒介类型,application/json
            DefaultSerializer:       defaultSerializer,        // 缺省序列化实例,legacyscheme.Codecs
            ResourceEncodingConfig:  resourceEncodingConfig, // 资源编码配置
            APIResourceConfigSource: resourceConfig,           // API启用的资源版本
            DefaultResourcePrefixes: specialDefaultResourcePrefixes, // 特殊资源prefix
    
            newStorageCodecFn: NewStorageCodec, // 为提供的存储媒介类型、序列化和请求的存储与内存版本组装一个存储codec
        }
    }
    

    newStroageCodecFn:用于基于请求的存储和内存的GroupVersion,以及存储的媒介、序列化实例,生成对应的runtime.Codec。看起来是为了在存储内存数据之间进行转换的编解码期 。

    存储序列化参数

    前面提到了存储序列化,我们在启动apiserver的时候,可以指定--storage-versions来指定存储资源版本,最终参数数据会存放到StorageSerializationOptions中的StorageVersions字段。

    StorageSerializationOptions包含了资源编码的属性,其中的DefaultStorageVersions属性没有暴漏参数,所以只能使用缺省值。如下所示,AllPreferredGroupVersions返回注册组的首选版本信息,表现形式为:group1/version1, group2/version2。

    最终StorageVersions的数据会覆盖DefaultStorageVersions。

    // StorageSerializationOptions contains the options for encoding resources.
    type StorageSerializationOptions struct {
        StorageVersions string
        // The default values for StorageVersions. StorageVersions overrides
        // these; you can change this if you want to change the defaults (e.g.,
        // for testing). This is not actually exposed as a flag.
    
        // 缺省的存储资源版本,StorageVersions将会覆盖自己,DefaultStorageVersion不会对外暴漏启动参数
        // 这样缺省版本的数据是肯定有的,如下面的NewStorageSerializationOptions方法中,它的值为:legacyscheme.Registry.AllPreferredGroupVersions()
        // 所以我们在处理存储版本的时候,缺省的版本里面的数据必须有,但是可以通过参数--storage-version来覆盖部分或者全部。
        DefaultStorageVersions string
    }
    func NewStorageSerializationOptions() *StorageSerializationOptions {
        return &StorageSerializationOptions{
            DefaultStorageVersions: legacyscheme.Registry.AllPreferredGroupVersions(),
            StorageVersions:        legacyscheme.Registry.AllPreferredGroupVersions(),
        }
    }
    

    --storage-versions 参数说明:
    The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable. (default "admission.k8s.io/v1beta1,admissionregistration.k8s.io/v1beta1,apps/v1,authentication.k8s.io/v1,authorization.k8s.io/v1,autoscaling/v1,batch/v1,certificates.k8s.io/v1beta1,componentconfig/v1alpha1,events.k8s.io/v1beta1,extensions/v1beta1,imagepolicy.k8s.io/v1alpha1,networking.k8s.io/v1,policy/v1beta1,rbac.authorization.k8s.io/v1,scheduling.k8s.io/v1alpha1,settings.k8s.io/v1alpha1,storage.k8s.io/v1,v1")

    BuildStorageFactory

    该方法构建存储工厂,关键代码部分是调用NewDefaultStorageFactory生成DefaultStorageFactory实例。

    func BuildStorageFactory(s *options.ServerRunOptions, apiResourceConfig *serverstorage.ResourceConfig) (*serverstorage.DefaultStorageFactory, error) {
        // 获取group-> GroupVersion Map
        storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
        if err != nil {
            return nil, fmt.Errorf("error generating storage version map: %s", err)
        }
    
        // 构建了存储工厂实例,使用了DefaultStorageFactory类型,归并了缺省资源编码(defaultResourceEncoding)与用户指定的数据
        storageFactory, err := kubeapiserver.NewStorageFactory(
            s.Etcd.StorageConfig, // Etcd的配置
            s.Etcd.DefaultStorageMediaType, // 缺省存储媒介类型:application/json
            legacyscheme.Codecs,            // 传统的编解码
            serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Registry),
            storageGroupsToEncodingVersion, // 前面创建的Group-> GroupVersion的映射
            // The list includes resources that need to be stored in a different
            // group version than other resources in the groups.
            // FIXME (soltysh): this GroupVersionResource override should be configurable
            // 下面列表列举了与组内其他资源不同,需要存储到不同的group version的资源
            []schema.GroupVersionResource{
                batch.Resource("cronjobs").WithVersion("v1beta1"),
                storage.Resource("volumeattachments").WithVersion("v1beta1"),
                admissionregistration.Resource("initializerconfigurations").WithVersion("v1alpha1"),
            },
            apiResourceConfig)  // 描述了启动的API信息,基于启动参数中的APIEnablementOptions生成,存放到了genericapiserver.Config的MergedResourceConfig字段中。
                                // 该字段合并了DefaultAPIResourceConfigSource()方法定义的资源。
    
        if err != nil {
            return nil, fmt.Errorf("error in initializing storage factory: %s", err)
        }
    
        // 同居资源绑定,约定了同居资源的查找顺序
        storageFactory.AddCohabitatingResources(networking.Resource("networkpolicies"), extensions.Resource("networkpolicies"))
        storageFactory.AddCohabitatingResources(apps.Resource("deployments"), extensions.Resource("deployments"))
        storageFactory.AddCohabitatingResources(apps.Resource("daemonsets"), extensions.Resource("daemonsets"))
        storageFactory.AddCohabitatingResources(apps.Resource("replicasets"), extensions.Resource("replicasets"))
        storageFactory.AddCohabitatingResources(api.Resource("events"), events.Resource("events"))
        for _, override := range s.Etcd.EtcdServersOverrides { // EtcdServersOverrides的格式:group/resource#servers,可以指定不同资源存储在不同的Etcd服务中。
            tokens := strings.Split(override, "#")
            if len(tokens) != 2 {
                glog.Errorf("invalid value of etcd server overrides: %s", override)
                continue
            }
    
            apiresource := strings.Split(tokens[0], "/")
            if len(apiresource) != 2 {
                glog.Errorf("invalid resource definition: %s", tokens[0])
                continue
            }
            group := apiresource[0]
            resource := apiresource[1]
            groupResource := schema.GroupResource{Group: group, Resource: resource}
    
            servers := strings.Split(tokens[1], ";")
            storageFactory.SetEtcdLocation(groupResource, servers)  // 为指定的服务,设置特定的Etcd服务
        }
    
        if len(s.Etcd.EncryptionProviderConfigFilepath) != 0 {
            transformerOverrides, err := encryptionconfig.GetTransformerOverrides(s.Etcd.EncryptionProviderConfigFilepath)
            if err != nil {
                return nil, err
            }
            for groupResource, transformer := range transformerOverrides {
                storageFactory.SetTransformer(groupResource, transformer)
            }
        }
    
        return storageFactory, nil
    }
    

    最终构建好的DefaultStorageFactory,会被存储在genericapiserver.Config的RESTOptionsGetter成员中,如下代码所示:

    func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
        s.addEtcdHealthEndpoint(c)  // 添加基本的Etcd健康检查
        c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
        return nil
    }
    

    如何获取各种资源的存储接口

    这里我们以PodStorage为例(k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go),来分析一下pods以及相关子资源的存储是如何实现的。PodStorage的定义为:

    type PodStorage struct {
        Pod         *REST
        Binding     *BindingREST
        Eviction    *EvictionREST
        Status      *StatusREST
        Log         *podrest.LogREST
        Proxy       *podrest.ProxyREST
        Exec        *podrest.ExecREST
        Attach      *podrest.AttachREST
        PortForward *podrest.PortForwardREST
    }
    

    可以看到PodStorage包含了Pod以及所有的相关的子资源的存储,上述的每个成员负责一种资源的存储服务,前面我们已经提到过,存储是放在ETCD的,下面我们先看看PodStorage实例的构建。

    PodStorage实例是在安装传统核心数据资源的Rest API的过程中被创建的,代码在k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go中的LegacyRESTStorageProvider.NewLegacyRESTStorage方法,在该方法中会创建各种资源数据存储实例,其中PodStorage的实例创建代码为:

        podStorage := podstore.NewStorage(
            restOptionsGetter,
            nodeStorage.KubeletConnectionInfo,
            c.ProxyTransport,
            podDisruptionClient,
        )
    

    这里的restOptionsGetter也就是为构建GenericAPIServer创建的k8s.io/apiserver/pkg/server/config.go中的Config结构中的RESTOptionsGetter成员,前面我们已经分析过,基于ETCD配置构建Storage工厂之后,最终的工厂实例赋予RESTOptionsGetter成员了。下面我们来看看NewStorage的方法的代码:

    // NewStorage returns a RESTStorage object that will work against pods.
    func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
    
        store := &genericregistry.Store{
            NewFunc:                  func() runtime.Object { return &api.Pod{} },         // NewFunc用于构建一个Pod实例
            NewListFunc:              func() runtime.Object { return &api.PodList{} },  // NewListFunc用于构建一个PodList实例
            PredicateFunc:            pod.MatchPod,
            DefaultQualifiedResource: api.Resource("pods"),
    
            CreateStrategy:      pod.Strategy,  // 创建、更新Pod时执行的缺省逻辑,具体的类型为podStrategy
            UpdateStrategy:      pod.Strategy,
            DeleteStrategy:      pod.Strategy,
            ReturnDeletedObject: true,
    
            TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
        }
        options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc}
        if err := store.CompleteWithOptions(options); err != nil {
            panic(err) // TODO: Propagate error up
        }
    
        statusStore := *store
        statusStore.UpdateStrategy = pod.StatusStrategy
    
        return PodStorage{
            Pod:         &REST{store, proxyTransport},
            Binding:     &BindingREST{store: store},
            Eviction:    newEvictionStorage(store, podDisruptionBudgetClient),
            Status:      &StatusREST{store: &statusStore},
            Log:         &podrest.LogREST{Store: store, KubeletConn: k},
            Proxy:       &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
            Exec:        &podrest.ExecREST{Store: store, KubeletConn: k},
            Attach:      &podrest.AttachREST{Store: store, KubeletConn: k},
            PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
        }
    }
    

    上面代码的关键,就是store对象的创建,store.Storage的类型为:storage.Interface接口( k8s.io/apiserver/pkg/storage/interfaces.go)。store.Storage后面我们在REST API的分析中会用到。

    NewFunc负责创建一个Pod实例,在API协议的支持中会用到它去创建一个Pod实例,我们在看store.CompleteWithOptions(options)中,实现了一些其它成员的填充,主要是Storage、DestroyFunc:

            e.Storage, e.DestroyFunc = opts.Decorator(
                opts.StorageConfig,
                e.NewFunc(),
                prefix,
                keyFunc,
                e.NewListFunc,
                attrFunc,
                triggerFunc,
            )
    

    可以看到Storage成员用到了前面说到的NewFunc和NewListFunc。在API Server的启动流程中,我们知道在构建genericserver.Config对象时,调用了EtcdOptions.ApplyWithStorageFactoryTo方法时,赋值了RESTOptionsGetter这个成员。

    func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
        s.addEtcdHealthEndpoint(c)
        c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
        return nil
    }
    

    所以genericserver.Config.RESTOptionsGetter的实例类型是:storageFactoryRestOptionsFactory,下面是他的GetRESTOptions方法:

    func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
        storageConfig, err := f.StorageFactory.NewConfig(resource)
        if err != nil {
            return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
        }
    
        ret := generic.RESTOptions{
            StorageConfig:           storageConfig,
            Decorator:               generic.UndecoratedStorage,
            DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
            EnableGarbageCollection: f.Options.EnableGarbageCollection,
            ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
        }
        if f.Options.EnableWatchCache {
            sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
            if err != nil {
                return generic.RESTOptions{}, err
            }
            cacheSize, ok := sizes[resource]
            if !ok {
                cacheSize = f.Options.DefaultWatchCacheSize
            }
            ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
        }
    
        return ret, nil
    }
    

    在这返回了一个RESTOptions对象,是调用genericregistry.StorageWithCacher(cacheSize)构建出来的generic.StorageDecorator实例,其实它是一个函数,用来返回Storage以及DestroyFunc方法,如下所示:

    // Creates a cacher based given storageConfig.
    func StorageWithCacher(capacity int) generic.StorageDecorator {
        return func(
            storageConfig *storagebackend.Config,
            objectType runtime.Object,
            resourcePrefix string,
            keyFunc func(obj runtime.Object) (string, error),
            newListFunc func() runtime.Object,
            getAttrsFunc storage.AttrFunc,
            triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
            // 创建一个裸的ETCD存储接口实例
            s, d := generic.NewRawStorage(storageConfig)
            if capacity == 0 {
                glog.V(5).Infof("Storage caching is disabled for %T", objectType)
                return s, d
            }
            glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)
    
            // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
            // Currently it has two layers of same storage interface -- cacher and low level kv.
            cacherConfig := storage.CacherConfig{
                CacheCapacity:        capacity,
                Storage:              s,
                Versioner:            etcdstorage.APIObjectVersioner{},
                Type:                 objectType,
                ResourcePrefix:       resourcePrefix,
                KeyFunc:              keyFunc,
                NewListFunc:          newListFunc,
                GetAttrsFunc:         getAttrsFunc,
                TriggerPublisherFunc: triggerFunc,
                Codec:                storageConfig.Codec,
            }
            // 基于裸ETCD存储实例创建带缓存能力的存储
            cacher := storage.NewCacherFromConfig(cacherConfig)
            // destroyFunc是用于释放存储本身资源的。
            destroyFunc := func() {
                cacher.Stop()
                d()
            }
    
            // TODO : Remove RegisterStorageCleanup below when PR
            // https://github.com/kubernetes/kubernetes/pull/50690
            // merges as that shuts down storage properly
            RegisterStorageCleanup(destroyFunc)
    
            return cacher, destroyFunc
        }
    }
    

    事情慢慢接近事情的本质了,最终的存储实例的生成是调用了generic.RESTOptions.Decorator( 实际值generic.UndecoratedStorage) 方法, 也就是上述代码返回的方法,在上述方法中,首先创建了一个裸的ETCD存储,然后在上面封装了一个Cache存储,下面我们在看看创建裸ETCD存储的代码(以etcd3 存储类型为例):

    func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
        s, d, err := factory.Create(*config)
        if err != nil {
            glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
        }
        return s, d
    }
    
    func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
        switch c.Type {
        case storagebackend.StorageTypeETCD2:
            return newETCD2Storage(c)
        case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
            // TODO: We have the following features to implement:
            // - Support secure connection by using key, cert, and CA files.
            // - Honor "https" scheme to support secure connection in gRPC.
            // - Support non-quorum read.
            return newETCD3Storage(c)
        default:
            return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
        }
    }
    
    
    func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
        tlsInfo := transport.TLSInfo{
            CertFile: c.CertFile,
            KeyFile:  c.KeyFile,
            CAFile:   c.CAFile,
        }
        tlsConfig, err := tlsInfo.ClientConfig()
        if err != nil {
            return nil, nil, err
        }
        // NOTE: Client relies on nil tlsConfig
        // for non-secure connections, update the implicit variable
        if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 {
            tlsConfig = nil
        }
        cfg := clientv3.Config{
            DialKeepAliveTime:    keepaliveTime,
            DialKeepAliveTimeout: keepaliveTimeout,
            Endpoints:            c.ServerList,
            TLS:                  tlsConfig,
        }
        client, err := clientv3.New(cfg)
        if err != nil {
            return nil, nil, err
        }
        ctx, cancel := context.WithCancel(context.Background())
        etcd3.StartCompactor(ctx, client, c.CompactionInterval)
        destroyFunc := func() {
            cancel()
            client.Close()
        }
        transformer := c.Transformer
        if transformer == nil {
            transformer = value.IdentityTransformer
        }
        if c.Quorum {
            return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
        }
        return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
    }
    
    // New returns an etcd3 implementation of storage.Interface.
    func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
        return newStore(c, true, pagingEnabled, codec, prefix, transformer)
    }
    
    func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
        versioner := etcd.APIObjectVersioner{}  // 实现了版本化
        result := &store{
            client:        c,
            codec:         codec,
            versioner:     versioner,
            transformer:   transformer,
            pagingEnabled: pagingEnabled,
            // for compatibility with etcd2 impl.
            // no-op for default prefix of '/registry'.
            // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
            pathPrefix: path.Join("/", prefix),
            watcher:    newWatcher(c, codec, versioner, transformer),
        }
        if !quorumRead {
            // In case of non-quorum reads, we can set WithSerializable()
            // options for all Get operations.
            result.getOps = append(result.getOps, clientv3.WithSerializable())
        }
        return result
    }
    
    

    上述store以及cache对象都实现storage.Interface接口,对外提供统一的Pod数据资源的存储服务。
    现在我们在回到PodStorage上来,我们看看它的成员你的定义:

    // REST自动集成了genericregistry.Store的方法
    type REST struct {
        *genericregistry.Store
        proxyTransport http.RoundTripper
    }
    type BindingREST struct {
        store *genericregistry.Store
    }
    ......
    

    所以构建了genericregistry.Store实例,就完成Pod资源以及其子资源存储对象的关键,那么整个PodStorage的构建也就完成了。

    存储接口如何服务于Rest API

    API Install源码分析中,我们解释了最终完成从rest.Storage到http.Route的转换的。具体细节这里不再说明,主要的原理,就是看对应的Storage对象实现了资源数据对象的什么接口,实现了对应的接口,最终就会生成相应的REST API。

    依然以PodStorage为例,来进行说明,PodStorage包含了各种成员,它们负责的接口关系如下:

    PodStorage Rest 存储对象 对应API Rest框架的接口 接口的功能
    REST rest.Redirector、rest.CreaterUpdate、rest.Lister、rest.Watcher、rest.GracefulDeleter、rest.Getter 重定向资源的路径、资源创建更新接口、资源列表查询接口、Watcher资源变化接口、支持延迟的资源删除接口、获取具体资源的信息接口
    BindngREST rest.Creater 创建资源的接口
    StatusREST rest.Updater 更新资源的接口
    LogREST rest.Updater 获取资源的接口
    ExecREST\ProxyREST\PortForwardREST rest.Connecter 连接资源的接口

    ······

    可以看到PodStorage.REST实现了不少REST功能,它是怎么实现的呢?这里再次看看它的定义:

    type REST struct {
        *genericregistry.Store
        proxyTransport http.RoundTripper
    }
    

    可以看到REST直接继承了genericregistry.Store的所有函数,这里我们以Creater为例来介绍它是如何实现一个Pod的创建的,Creater的接口定义如下:

    // Creater is an object that can create an instance of a RESTful object.
    type Creater interface {
        // New returns an empty object that can be used with Create after request data has been put into it.
        // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
        New() runtime.Object
    
        // Create creates a new version of a resource. If includeUninitialized is set, the object may be returned
        // without completing initialization.
        Create(ctx genericapirequest.Context, obj runtime.Object, createValidation ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error)
    }
    

    而genericregistry.Store实现了相关的方法,
    主要分为几个步骤:

    • 调用BeforeCreate把creation之前的通用操作完成。会调用PrepareForCreate,GenerateName,Validate
    • 获取对象的名字
    • 获取KEY
    • 生成一个空的对象,而对于PodStorage的store的创建中,我们已经知道,它实际是生成一个Pod对象
    • 调用Storage.Create把数据对象序列化写入etcd中。
      如下所示:
    // Create inserts a new item according to the unique key from the object.
    func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) {
            // BeforeCreate把creation之前的通用操作完成。会调用PrepareForCreate,GenerateName,Validate
        if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
            return nil, err
        }
        // at this point we have a fully formed object.  It is time to call the validators that the apiserver
        // handling chain wants to enforce.
        if createValidation != nil {
            if err := createValidation(obj.DeepCopyObject()); err != nil {
                return nil, err
            }
        }
            // 获取对象的名字
        name, err := e.ObjectNameFunc(obj)
        if err != nil {
            return nil, err
        }
            // 获取KEY
        key, err := e.KeyFunc(ctx, name)
        if err != nil {
            return nil, err
        }
        qualifiedResource := e.qualifiedResourceFromContext(ctx)
        ttl, err := e.calculateTTL(obj, 0, false)
        if err != nil {
            return nil, err
        }
            // 生成一个空的对象,而对于PodStorage的store的创建中,我们已经知道,它实际是生成一个Pod对象
        out := e.NewFunc()
            // 这里用到Storage,也分析过,最终会调用Etcd,并且在这里会调用runtime.Codec完成从对象到字符串的转换,最终保存到etcd中。
        if err := e.Storage.Create(ctx, key, obj, out, ttl); err != nil {
            err = storeerr.InterpretCreateError(err, qualifiedResource, name)
            err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
            if !kubeerr.IsAlreadyExists(err) {
                return nil, err
            }
            if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {
                return nil, err
            }
            accessor, errGetAcc := meta.Accessor(out)
            if errGetAcc != nil {
                return nil, err
            }
            if accessor.GetDeletionTimestamp() != nil {
                msg := &err.(*kubeerr.StatusError).ErrStatus.Message
                *msg = fmt.Sprintf("object is being deleted: %s", *msg)
            }
            return nil, err
        }
        if e.AfterCreate != nil {
            if err := e.AfterCreate(out); err != nil {
                return nil, err
            }
        }
        if e.Decorator != nil {
            if err := e.Decorator(obj); err != nil {
                return nil, err
            }
        }
        if !includeUninitialized {
            return e.WaitForInitialized(ctx, out)
        }
        return out, nil
    }
    

    那具体写入etcd是如何完成的呢?前面我们列举了etcd3的store类型对象(实现了storage.Interface)的构建,这里看看基于etcd3如何把对象写入到etcd中,代码如下:

    
    // Create implements storage.Interface.Create.
    func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
        // version不能被设置
        if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
            return errors.New("resourceVersion should not be set on objects to be created")
        }
    
        if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
            return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
        }
        // 序列化,这里应该是编码为json字符串
        data, err := runtime.Encode(s.codec, obj)
        if err != nil {
            return err
        }
        // 找到etcd路径
        key = path.Join(s.pathPrefix, key)
    
        opts, err := s.ttlOpts(ctx, int64(ttl))
        if err != nil {
            return err
        }
        // 按需做存储之前的数据转换
        newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
        if err != nil {
            return storage.NewInternalError(err.Error())
        }
    
        // 基于etcd3的api,写入数据
        txnResp, err := s.client.KV.Txn(ctx).If(
            notFound(key),
        ).Then(
            clientv3.OpPut(key, string(newData), opts...),
        ).Commit()
        if err != nil {
            return err
        }
        if !txnResp.Succeeded {
            return storage.NewKeyExistsError(key, 0)
        }
    
        if out != nil {
            putResp := txnResp.Responses[0].GetResponsePut()
            return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
        }
        return nil
    }
    

    资源数据对象存储的编解码

    前面我们看到了如何对外提供API服务,并进行数据的最终的存储处理操作。那么存储数据时的编解码是如何进行的呢,这里专门划出一章来分析。

    首先,我们看一下Etcd的启动参数,缺省如下所示:

    func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
        return &EtcdOptions{
            StorageConfig:           *backendConfig,
            DefaultStorageMediaType: "application/json",    // 缺省的存储没接了类型
            DeleteCollectionWorkers: 1,
            EnableGarbageCollection: true,
            EnableWatchCache:        true,  // 缺省是带了WatchCache的,要取消,必须设置参数为--watch-cache=false 来取消
            DefaultWatchCacheSize:   100,
        }
    }
    

    从上面可以看出,缺省的存储媒介类型为"applicatoin/json",并且缺省是启用了watch-cache功能的,下面还是以PodStorage的初始化为例来进行分析,看如何对Pod的存储进行编解码的。

    在前面的分析中,我们也知道了PodStorage初始时,会构建一个store成员,store成员中genericserver.Config.RESTOptionsGetter实际存储的对象类型为storageFactoryRestOptionsFactory,最终,我们在store.CompleteWithOptions的调用中,创建真正的存储实例:

    func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
        ......
            // 在PodStorage的初始化中,DefaultQualifiedResource为api.Resource("pods")
        opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
            ......
        if e.Storage == nil {
            e.Storage, e.DestroyFunc = opts.Decorator(
                opts.StorageConfig,
                e.NewFunc(),
                prefix,
                keyFunc,
                e.NewListFunc,
                attrFunc,
                triggerFunc,
            )
        }
        return nil
    }
    

    所以从上面的代码可以看出,我们主要通过两个步骤来生成存储对象接口。

      1. 调用storageFactoryRestOptionsFactory.GetRESTOptions方法, 传入api.Resource("pods")参数,从而生成Decorator方法;这里会得到的Decorator方法为:genericregistry.StorageWithCacher;
        完整代码为:
    func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
            // 这里是编解码的关键,编解码的初始化在这里,这里的StorageFactory的实例对象为DefaultStorageFactory
        storageConfig, err := f.StorageFactory.NewConfig(resource)
        if err != nil {
            return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
        }
    
        ret := generic.RESTOptions{
            StorageConfig:           storageConfig,
            Decorator:               generic.UndecoratedStorage,        // 这里将直接返回Raw Etcd Storage
            DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
            EnableGarbageCollection: f.Options.EnableGarbageCollection,
            ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
        }
        // 如果启动了Watch Cache,注意只是Watch Cache ......
        // 则Decorator会被初始化为genericregistry.StorageWithCahce...
        if f.Options.EnableWatchCache {
            sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
            if err != nil {
                return generic.RESTOptions{}, err
            }
            cacheSize, ok := sizes[resource]
            if !ok {
                cacheSize = f.Options.DefaultWatchCacheSize
            }
            ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
        }
    
        return ret, nil
    }
    

    要了解具体的编解码还得继续研究DefaultStorageFactory.NewConfig的代码:

    func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
            // 查看是否有共生的资源,如果没有就返回自己
        chosenStorageResource := s.getStorageGroupResource(groupResource)
    
        // operate on copy
        storageConfig := s.StorageConfig
        codecConfig := StorageCodecConfig{
            StorageMediaType:  s.DefaultMediaType,    /
            StorageSerializer: s.DefaultSerializer, // 这里传入的实际值为legacyscheme.Codecs
        }
    
        if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
            override.Apply(&storageConfig, &codecConfig)
        }
        if override, ok := s.Overrides[chosenStorageResource]; ok {
            override.Apply(&storageConfig, &codecConfig)
        }
            // ResourceEncodingConfig相关内容。
            // 分为缺省的,以及Override的逻辑,
            // 资源变量中包含Internel、Externel两种,Internel是内存数据对应的GroupVersion,Externel则是底层存储的GroupVersion
            // 如果找不到的情况下, 一般我们会给内存数据这个缺省定义版本:APIVersionInternal = "__internal"
            // TODO:这块打算下面章节专门讲解
        var err error
        codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
        if err != nil {
            return nil, err
        }
        codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
        if err != nil {
            return nil, err
        }
        codecConfig.Config = storageConfig
            // 这里的newStorageCodecFn为 storage.NewStorageCodec,见NewDefaultStorageFactory
        storageConfig.Codec, err = s.newStorageCodecFn(codecConfig)
        if err != nil {
            return nil, err
        }
        glog.V(3).Infof("storing %v in %v, reading as %v from %#v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)
    
        return &storageConfig, nil
    }
    

    从函数代码中看来,有两块我们需要去弄清楚:ResourceEncodingConfig和Codec的生成逻辑,ResourceEncodingConfig我们放到下一个章节进行说明,这里继续分析NewStorageCodec来看看,是如何生成Codec对象的。见:k8s.io/apiserver/pkg/server/storage/storage_codec.go

    // NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
    // storage and memory versions.
    func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
        mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType) // 这里一般为: application/json的话,返回的mediaType 也是application/json
        if err != nil {
            return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
        }
    
        // ETCD2 只支持 application/json
        if opts.Config.Type == storagebackend.StorageTypeETCD2 && mediaType != "application/json" {
            glog.Warningf(`storage type %q does not support media type %q, using "application/json"`, storagebackend.StorageTypeETCD2, mediaType)
            mediaType = "application/json"
        }
    
        // 寻找对应的媒介类型的序列化实例,如果找不到,系统会报错
        // 注意:在前面分析DefaultStorageFactory.NewConfig中,知道opts.StorageSerializer = DefaultStorageFactory.DefaultSerializer = legacyscheme.Codecs
        serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
        if !ok { //  如果找不到,系统会报错
            return nil, fmt.Errorf("unable to find serializer for %q", mediaType)
        }
    
        // 知道对应的媒介类型的序列化器,具体可以参见一下序列化工厂部分文档
        s := serializer.Serializer
    
        // etcd2不支持二进制格式,只支持文本格式。
        // make sure the selected encoder supports string data
        if !serializer.EncodesAsText && opts.Config.Type == storagebackend.StorageTypeETCD2 {
            return nil, fmt.Errorf("storage type %q does not support binary media type %q", storagebackend.StorageTypeETCD2, mediaType)
        }
    
        // serializer实现了Encoder,Decoder接口
        // Give callers the opportunity to wrap encoders and decoders.  For decoders, each returned decoder will
        // be passed to the recognizer so that multiple decoders are available.
        var encoder runtime.Encoder = s
        if opts.EncoderDecoratorFn != nil {  // Encoder封装函数
            encoder = opts.EncoderDecoratorFn(encoder)
        }
        decoders := []runtime.Decoder{
            // selected decoder as the primary
            s,
            // universal deserializer as a fallback
            opts.StorageSerializer.UniversalDeserializer(), // s解析不了的,采用万能解析器
            // base64-wrapped universal deserializer as a last resort.
            // this allows reading base64-encoded protobuf, which should only exist if etcd2+protobuf was used at some point.
            // data written that way could exist in etcd2, or could have been migrated to etcd3.
            // TODO: flag this type of data if we encounter it, require migration (read to decode, write to persist using a supported encoder), and remove in 1.8
            runtime.NewBase64Serializer(nil, opts.StorageSerializer.UniversalDeserializer()),   // 最后尝试Base64解析器
        }
        if opts.DecoderDecoratorFn != nil { // Decoder封装函数
            decoders = opts.DecoderDecoratorFn(decoders)
        }
    
        // 注意之类的opts.StorageSerializer = legacyscheme.Codecs,实际结构为CodecFactory
        // Ensure the storage receives the correct version.
        encoder = opts.StorageSerializer.EncoderForVersion(
            encoder,
            runtime.NewMultiGroupVersioner(  // 构建一个GroupVersioner,该GroupVersioner接受StorageVersion.Group和MemoryVersion.Group,返回对应StorageVersion的GVK。
                opts.StorageVersion,
                schema.GroupKind{Group: opts.StorageVersion.Group},
                schema.GroupKind{Group: opts.MemoryVersion.Group},
            ),
        )
        decoder := opts.StorageSerializer.DecoderToVersion(
            recognizer.NewDecoder(decoders...),
            runtime.NewMultiGroupVersioner( // 构建一个GroupVersioner,该GroupVersioner接受MemoryVersion.Group和StorageVersion.Group,返回对应MemoryVersion的GVK。
                opts.MemoryVersion,
                schema.GroupKind{Group: opts.MemoryVersion.Group},
                schema.GroupKind{Group: opts.StorageVersion.Group},
            ),
        )
    
        return runtime.NewCodec(encoder, decoder), nil
    }
    

    上述代码生成了Codec对象,代码逻辑比较清晰,主要逻辑是基于StorageFactory中存户的Serializer进行封装,封装出来的Encoder和Decoder能够支持版本化,最终生成统一的Codec实例。

    有两个地方需要说明一下。

    第一点,GroupVersioner负责提炼一系列可供转换的GVK目标,并把他们转换成统一的GVK,在上述代码中,encoder的构建过程中,通过runtime.NewMultiGroupVersioner生成了一个multiGroupVersioner对象,在初始化过程中,我们可以看到它的目标为StorageVersion,而接受的输入Group有StorageVersion.Group与MemoryVersion.Group,在它的实现方法中,可以支持接受StorageVersion.Group和MemoryVersion.Group的数据,并最终转换StorageVersion对应的Group与Version,同时加上输入对象本身的Kind,从而得到最终的GVK,如下所示:

    func (v multiGroupVersioner) KindForGroupVersionKinds(kinds []schema.GroupVersionKind) (schema.GroupVersionKind, bool) {
        for _, src := range kinds {
            for _, kind := range v.acceptedGroupKinds {
                if kind.Group != src.Group {
                    continue
                }
                if len(kind.Kind) > 0 && kind.Kind != src.Kind {
                    continue
                }
                return v.target.WithKind(src.Kind), true    // 这里基于目标GV生成GVK
            }
        }
        return schema.GroupVersionKind{}, false
    }
    

    decoder部分的GroupVersioner的对象也是与encoder部分一样,只是目标GV反过来了。

    第二点,分析一下EncoderForVersion的函数调用,该函数负责输入一个Encoder和GroupVersioner,返回一个encoder,并且该encoder能够保证写到指定的序列化器的对象是指定的GV。(注意断句:该对象是指定的GV,该对象会被写入到指定的serailizer中)。

    type StorageSerializer interface {
        // SupportedMediaTypes are the media types supported for reading and writing objects.
        SupportedMediaTypes() []SerializerInfo
    
        // UniversalDeserializer returns a Serializer that can read objects in multiple supported formats
        // by introspecting the data at rest.
        UniversalDeserializer() Decoder
    
        // 返回一个encoder,该encoder能够保证写入底层序列化器的对象是指定的GV
        // EncoderForVersion returns an encoder that ensures objects being written to the provided
        // serializer are in the provided group version.
        EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder
    
        // 返回一个decoder,该decoder能够保证被底层序列化器的反序列化的对象是指定的GV
        // DecoderForVersion returns a decoder that ensures objects being read by the provided
        // serializer are in the provided group version by default.
        DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
    }
    

    同时,前面我们多次分析过,这里opts.StorageSerializer的值为legacyscheme.Codecs。

    legacyscheme.Codecs的实例构建,var Codecs = serializer.NewCodecFactory(Scheme),它是一个CodecFactory实例,见API Server编解码,CodecFactory实例缺省会支持json/yaml/protobuf三种编解码。

      1. 然后调用genericregistry.StorageWithCacher方法Decorator函数,最终调用Decorator生成Storage实例。这个之前已经分析过,就不再赘述了。

    下面我们再次回顾一下PodStorage对外提供REST服务的过程,在提供Create REST服务时,最终调用了storage.Interface.Create方法,storage.Interface实际对应于etcd3.store,在它的Create方法中,最终,调用了runtime.Encode来完成编解码工作。

    从整个etcd3.store实例的创建过程来看,这里的s.codec成员是来自于storagebackend.Config.Codec。

    func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
            ......
        // 序列化,这里应该是编码为json字符串
        data, err := runtime.Encode(s.codec, obj)
        if err != nil {
            return err
        }
            ......
        return nil
    }
    

    Codec的生成我们前面也进行过分析,主要是在DefaultStorageFactory.NewConfig,具体的分析见上面,下面我们摘取一部分代码来分析,Codec的encoder和decoder成员如下所示:

        // 注意之类的opts.StorageSerializer = legacyscheme.Codecs,实际结构为CodecFactory
        // Ensure the storage receives the correct version.
        encoder = opts.StorageSerializer.EncoderForVersion(
            encoder,
            runtime.NewMultiGroupVersioner(  // 构建一个GroupVersioner,该GroupVersioner接受StorageVersion.Group和MemoryVersion.Group,返回对应StorageVersion的GVK。
                opts.StorageVersion,
                schema.GroupKind{Group: opts.StorageVersion.Group},
                schema.GroupKind{Group: opts.MemoryVersion.Group},
            ),
        )
        decoder := opts.StorageSerializer.DecoderToVersion(
            recognizer.NewDecoder(decoders...),
            runtime.NewMultiGroupVersioner( // 构建一个GroupVersioner,该GroupVersioner接受MemoryVersion.Group和StorageVersion.Group,返回对应MemoryVersion的GVK。
                opts.MemoryVersion,
                schema.GroupKind{Group: opts.MemoryVersion.Group},
                schema.GroupKind{Group: opts.StorageVersion.Group},
            ),
        )
    

    所以,我们在往Etcd中存储写入时,目标Group和版本为opts.StorageVersion,而从Etcd中读取数据时则为opts.MemoryVersion,现在我们来看看,这两个究竟分别是什么(还是以Pod为例)。

    这里我们先给一下答案在往下分析:组版本的注册和启用信息是存储在Registry这样一个全局变量中,它的类型是APIRegistrationManager,它有一个成员groupMetaMap类型为map[string]*apimachinery.GroupMeta,用来存储不同组的Metadata信息。而对应组的信息是存储在GroupMeta对象中。这里我们主要是分析Pod,那么它是属于核心组(core),一般来说它的组名是“”,而版本是“v1”。所以SotargeVersion值为{group:"", verison:"v1"},而MemoryVersion是什么呢,通过InMemoryEncodingFor的方法,我们可以看到,缺省情况下,它的版本是"__internal",所以这里值为:{group:"", version:"__internal"}。

    代码也是在DefaultStarageFactory.NewConfig函数中,如下所示:

        var err error
        codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
        if err != nil {
            return nil, err
        }
        codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
        if err != nil {
            return nil, err
        }
    

    ResourceEncodingConfig

    ResourceEncodingConfig的内容是什么,前面我们做过基本的分析,但是不够透彻,这里作为一个章节单独进行详细的分析。ResourceEncodingConfig的内容应该是由三部分组成的:

    • 缺省数据编码配置
    • StorageEncodingOverrides
    • ResourceEncodingOverrides

    下面将对这三部分分别进行说明,然后再研究,他们怎么组合起来的。

    缺省资源编码配置

    代码见k8s.io/kubernetes/cmd/kube-apiserver/app/server.go中的BuildStorageFactory方法,中,调用了NewStorageFactory方法时,传入的defaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig参数中赋值为:

    serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Registry)

    func NewDefaultResourceEncodingConfig(registry *registered.APIRegistrationManager) *DefaultResourceEncodingConfig {
        return &DefaultResourceEncodingConfig{groups: map[string]*GroupResourceEncodingConfig{}, registry: registry}
    }
    

    从这里可以看出,DefaultResourceEncodingConfig中,传入的groups成员是个空的map数据,那么这里的缺省的资源编码配置又是如何获得的呢?

    答案:从registry中找出对应GroupVersion的GroupMeta(组元数据),GroupMeta.GroupVersion成员。

    这里我们就需要先来看一下前面我们会用到的两个方法:StorageEncodingFor(chosenStorageResource)和InMemoryEncodingFor(chosenStorageResource)的定义。

    func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
        // 从registry中找到对应的组的元数据
        groupMeta, err := o.registry.Group(resource.Group)
        if err != nil {
            return schema.GroupVersion{}, err
        }
        // 从groups成员看是否有该组的专有编码
        groupEncoding, groupExists := o.groups[resource.Group]
        // 如果没有专有编码说明,则直接使用组元数据中的GroupVersion值
        if !groupExists {
            // return the most preferred external version for the group
            return groupMeta.GroupVersion, nil
        }
        // 否则查询该组资源的 Externel 资源编码
        resourceOverride, resourceExists := groupEncoding.ExternalResourceEncodings[resource.Resource]
        if !resourceExists {
            return groupEncoding.DefaultExternalEncoding, nil       // 如果找不到,则直接返回该组编码的缺省外部编码
        }
    
        return resourceOverride, nil        // 返回对应组资源的外部编码
    }
    

    InMemoryEncodingFor函数与StorageEncodingFor基本上相同,除了是访问内部编码资源外,没什么区别,所以不列举代码了。
    对于专有组与组资源编码的存储成员groups是如何赋值的,在后面两节中会专门说明,这里主要分析一下另外一个成员registry,registry在这里的实际的值为legacyscheme.Registry,下面我们来分析一下该对象,它的定义为:

    var Registry = registered.NewOrDie(os.Getenv("KUBE_API_VERSIONS"))

    Registry的类型为APIRegistrationManager,APIRegistrationManager提供了注册的Group Version以及启用的API groups的信息。

    type APIRegistrationManager struct {
        // registeredGroupVersions stores all API group versions for which RegisterGroup is called.
        registeredVersions map[schema.GroupVersion]struct{}     // 注册的group version
    
        // enabledVersions represents all enabled API versions. It should be a
        // subset of registeredVersions. Please call EnableVersions() to add
        // enabled versions.
        enabledVersions map[schema.GroupVersion]struct{}     // 启用的group version , 是注册GV的子集
    
        // map of group meta for all groups.
        groupMetaMap map[string]*apimachinery.GroupMeta    // 所有组的元数据信息
    
        // envRequestedVersions represents the versions requested via the
        // KUBE_API_VERSIONS environment variable. The install package of each group
        // checks this list before add their versions to the latest package and
        // Scheme.  This list is small and order matters, so represent as a slice
        envRequestedVersions []schema.GroupVersion
    }
    

    前面我们看到,legacyscheme.Registry是一个全局变量,那么究竟有注册了哪些版本和启用了哪些版本呢?这里,我们要了解Go语言的机制,func init()函数会在引用某个包的时候,自动被调用,经过仔细分析,可以发现在k8s.io/kubernetes/pkg/apis/core/install/install.go中,有一个这样的函数:

    func init() {
        Install(legacyscheme.GroupFactoryRegistry, legacyscheme.Registry, legacyscheme.Scheme)
    }
    

    注意,这里我们分析的是PodStorage,所以只看了core资源这块,对于API的数据资源有很多个Group,包括但不限于:Core、abac、apps、authentication、authorization、autoscaling、batch、componentconfig、extensions、policy、rbac、certifactes、networking,新的版本会不断的增加新的组。每个组都处于k8s.io/pkg/apis下的一个子目录中,每个字段都会有一段func init()函数,目前他们的内容都是一样,也都是向legacyscheme.Registry和legacyscheme.Scheme注册信息。

    调用Install函数,Install函数负责注册API Group和把各种资源数据对象添加到Scheme中,所以在这个函数中同时完成legacyscheme.Registry和legacyscheme.Scheme两个变量。

    // Install registers the API group and adds types to a scheme
    func Install(groupFactoryRegistry announced.APIGroupFactoryRegistry, registry *registered.APIRegistrationManager, scheme *runtime.Scheme) {
        if err := announced.NewGroupMetaFactory(
            &announced.GroupMetaFactoryArgs{
                GroupName:                  core.GroupName,    // 这里为“”
                VersionPreferenceOrder:     []string{v1.SchemeGroupVersion.Version},
                AddInternalObjectsToScheme: core.AddToScheme,       // 添加内部对象的AddToScheme方法
                RootScopedKinds: sets.NewString(
                    "Node",
                    "Namespace",
                    "PersistentVolume",
                    "ComponentStatus",
                ),
                IgnoredKinds: sets.NewString(
                    "ListOptions",
                    "DeleteOptions",
                    "Status",
                    "PodLogOptions",
                    "PodExecOptions",
                    "PodAttachOptions",
                    "PodPortForwardOptions",
                    "PodProxyOptions",
                    "NodeProxyOptions",
                    "ServiceProxyOptions",
                ),
            },
            announced.VersionToSchemeFunc{
                v1.SchemeGroupVersion.Version: v1.AddToScheme,      // 这里描述了版本与AddToScheme方法的对应关系,在RegisterAndEnable过程中会被调用。
            },
        ).Announce(groupFactoryRegistry).RegisterAndEnable(registry, scheme); err != nil {
            panic(err)
        }
    }
    

    函数中有三个参数其中groupFactoryRegistry这个参数暂时不知道有啥用处,上面的Announce方法调用,会往里面注册前面创建的GroupMetaFactory对象。

    下面我们来分析RegisterAndEnable地方

    // RegisterAndEnable is provided only to allow this code to get added in multiple steps.
    // It's really bad that this is called in init() methods, but supporting this
    // temporarily lets us do the change incrementally.
    func (gmf *GroupMetaFactory) RegisterAndEnable(registry *registered.APIRegistrationManager, scheme *runtime.Scheme) error {
        if err := gmf.Register(registry); err != nil {    // 注册GV信息,只有注册以后的版本,才能Enable
            return err
        }
        if err := gmf.Enable(registry, scheme); err != nil {    // Enable版本,添加资源数据对象到Scheme中
            return err
        }
    
        return nil
    }
    

    注册GV到registry中的方法如下:

    // Register constructs the finalized prioritized version list and sanity checks
    // the announced group & versions. Then it calls register.
    func (gmf *GroupMetaFactory) Register(m *registered.APIRegistrationManager) error {
        if gmf.GroupArgs == nil {
            return fmt.Errorf("partially announced groups are not allowed, only got versions: %#v", gmf.VersionArgs)
        }
        if len(gmf.VersionArgs) == 0 {
            return fmt.Errorf("group %v announced but no versions announced", gmf.GroupArgs.GroupName)
        }
    
        pvSet := sets.NewString(gmf.GroupArgs.VersionPreferenceOrder...)
        if pvSet.Len() != len(gmf.GroupArgs.VersionPreferenceOrder) {
            return fmt.Errorf("preference order for group %v has duplicates: %v", gmf.GroupArgs.GroupName, gmf.GroupArgs.VersionPreferenceOrder)
        }
        prioritizedVersions := []schema.GroupVersion{}  // 版本信息
        for _, v := range gmf.GroupArgs.VersionPreferenceOrder {
            prioritizedVersions = append(
                prioritizedVersions,
                schema.GroupVersion{
                    Group:   gmf.GroupArgs.GroupName,    //  组,这里为“”
                    Version: v,    // 这里为v1
                },
            )
        }
    
        // Go through versions that weren't explicitly prioritized.
        unprioritizedVersions := []schema.GroupVersion{}
        for _, v := range gmf.VersionArgs {
            if v.GroupName != gmf.GroupArgs.GroupName {
                return fmt.Errorf("found %v/%v in group %v?", v.GroupName, v.VersionName, gmf.GroupArgs.GroupName)
            }
            if pvSet.Has(v.VersionName) {
                pvSet.Delete(v.VersionName)
                continue
            }
            unprioritizedVersions = append(unprioritizedVersions, schema.GroupVersion{Group: v.GroupName, Version: v.VersionName})
        }
        if len(unprioritizedVersions) > 1 {
            glog.Warningf("group %v has multiple unprioritized versions: %#v. They will have an arbitrary preference order!", gmf.GroupArgs.GroupName, unprioritizedVersions)
        }
        if pvSet.Len() != 0 {
            return fmt.Errorf("group %v has versions in the priority list that were never announced: %s", gmf.GroupArgs.GroupName, pvSet)
        }
        prioritizedVersions = append(prioritizedVersions, unprioritizedVersions...)
        m.RegisterVersions(prioritizedVersions)    // 注册GV,这里其实就是 {group="", version="v1"}
        gmf.prioritizedVersionList = prioritizedVersions
        return nil
    }
    

    存储用到了legacyscheme域的Scheme和Registry,从上面的代码分析,这里只注册了{group="", version="v1"},下面来看Enable函数的代码。

    func (gmf *GroupMetaFactory) Enable(m *registered.APIRegistrationManager, scheme *runtime.Scheme) error {
        externalVersions := []schema.GroupVersion{}
        for _, v := range gmf.prioritizedVersionList {
            if !m.IsAllowedVersion(v) { // 可以在KUBE_API_VERSIONS中配置启用的版本,一般都是调试的时候用。
                continue
            }
            externalVersions = append(externalVersions, v)
            if err := m.EnableVersions(v); err != nil {
                return err
            }
            gmf.VersionArgs[v.Version].AddToScheme(scheme)      // 这里调用了AddToScheme方法,会把对应版本的资源数据对象添加到scheme中。
        }
        if len(externalVersions) == 0 {
            glog.V(4).Infof("No version is registered for group %v", gmf.GroupArgs.GroupName)
            return nil
        }
    
        if gmf.GroupArgs.AddInternalObjectsToScheme != nil {
            gmf.GroupArgs.AddInternalObjectsToScheme(scheme)        // 添加internel 对象到Scheme中。
        }
    
        preferredExternalVersion := externalVersions[0]
        accessor := meta.NewAccessor()
    
        groupMeta := &apimachinery.GroupMeta{
            GroupVersion:  preferredExternalVersion,        // 对于核心组""来说,这里是 {"", v1}
            GroupVersions: externalVersions,                // 对于核心族来说这里是 []string{"v1"}
            SelfLinker:    runtime.SelfLinker(accessor),
        }
        for _, v := range externalVersions {
            gvf := gmf.VersionArgs[v.Version]
            if err := groupMeta.AddVersionInterfaces(       // 这里为每个版本添加VersionInterface,VersionInterface包括两个部分:
                                                            // 1. ObjectConverter其实也就是scheme,负责不同版本资源数据转换
                                                            // 2. 而MetadataAccessor负责访问各种资源的基础信息,如Annotations,Name等信息
                schema.GroupVersion{Group: gvf.GroupName, Version: gvf.VersionName},
                &meta.VersionInterfaces{
                    ObjectConvertor:  scheme,
                    MetadataAccessor: accessor,
                },
            ); err != nil {
                return err
            }
        }
        groupMeta.InterfacesFor = groupMeta.DefaultInterfacesFor        // 该方法主要根据group version来返回前面注册的各个版本的VersionInterface
        groupMeta.RESTMapper = gmf.newRESTMapper(scheme, externalVersions, groupMeta)
    
        if err := m.RegisterGroup(*groupMeta); err != nil {
            return err
        }
        return nil
    }
    

    可以看Enable是关键,主要干了两件事情,把资源对象注册到Scheme中和在Registry中启用对应的GroupVersion信息。启用的GroupVersion信息,会把相应的GroupVersion的Metadata也进行了初始化。

    第一点:资源对象注册到Scheme,这里是调用了AddToScheme方法,对于core这个组下面的资源的代码在k8s.io/kubernetes/pkg/apis/core/v1/register.go中,如下所示:

    var (
        SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
        AddToScheme   = SchemeBuilder.AddToScheme
    )
    
    func addKnownTypes(scheme *runtime.Scheme) error {
        if err := scheme.AddIgnoredConversionType(&metav1.TypeMeta{}, &metav1.TypeMeta{}); err != nil {
            return err
        }
        scheme.AddKnownTypes(SchemeGroupVersion,
            &Pod{},
            &PodList{},
            &PodStatusResult{},
            &PodTemplate{},
            &PodTemplateList{},
            &ReplicationControllerList{},
            &ReplicationController{},
            &ServiceList{},
            &Service{},
            &ServiceProxyOptions{},
            &NodeList{},
            &Node{},
            &NodeConfigSource{},
            &NodeProxyOptions{},
            &Endpoints{},
            &EndpointsList{},
            &Binding{},
            &Event{},
            &EventList{},
            &List{},
            &LimitRange{},
            &LimitRangeList{},
            &ResourceQuota{},
            &ResourceQuotaList{},
            &Namespace{},
            &NamespaceList{},
            &ServiceAccount{},
            &ServiceAccountList{},
            &Secret{},
            &SecretList{},
            &PersistentVolume{},
            &PersistentVolumeList{},
            &PersistentVolumeClaim{},
            &PersistentVolumeClaimList{},
            &PodAttachOptions{},
            &PodLogOptions{},
            &PodExecOptions{},
            &PodPortForwardOptions{},
            &PodProxyOptions{},
            &ComponentStatus{},
            &ComponentStatusList{},
            &SerializedReference{},
            &RangeAllocation{},
            &ConfigMap{},
            &ConfigMapList{},
        )
    
        return nil
    }
    

    可以看出实际是调用了上述的scheme.addKnownTypes方法,从而把Pod、Service等常见资源注册到了Scheme中。注意,另外会添加internel对象资源到Scheme中,见k8s.io/kubernetes/pkg/apis/core/register.go,其实类型与前面v1版本的对象一致,这是版本为"__internel"

    第二点:在Registry中Enable对应的GroupVersion,这里主要做了三个步骤:Enable对应的GV;生成对应组的metadata对象GrupMeta并注册到对应的组中;生成对应的RESTMapper对象。

    type GroupMeta struct {
        // GroupVersion represents the preferred version of the group.
        GroupVersion schema.GroupVersion      // 优先的组、版本信息
    
        // GroupVersions is Group + all versions in that group.
        GroupVersions []schema.GroupVersion   // 本组中的所有的版本信息
    
        // SelfLinker can set or get the SelfLink field of all API types.
        // TODO: when versioning changes, make this part of each API definition.
        // TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses
        // to go through the InterfacesFor method below.
        SelfLinker runtime.SelfLinker    // SelfLinker这块一直没有研究
    
        // RESTMapper provides the default mapping between REST paths and the objects declared in a Scheme and all known
        // versions.
        RESTMapper meta.RESTMapper    // Kind与资源的对应关系,譬如Kind为Pod,那么对应的资源有:pod, pods
    
        // InterfacesFor returns the default Codec and ResourceVersioner for a given version
        // string, or an error if the version is not known.
        // TODO: make this stop being a func pointer and always use the default
        // function provided below once every place that populates this field has been changed.
        InterfacesFor func(version schema.GroupVersion) (*meta.VersionInterfaces, error)    // 一般指向DefaultInterfacesFor函数,基于下面的map来返回对应的VersionInterfaces
    
        // InterfacesByVersion stores the per-version interfaces.
        InterfacesByVersion map[schema.GroupVersion]*meta.VersionInterfaces  // 存储每个组版本的VersionInterfaces,在VersionInterfaces中,
                // 一般存储两个对象:runtime.ObjectConvertor: scheme、MetadataAccessor实现了资源对象版本转换与元数据获取接口
    }
    

    RESTMapper主要用于Kind与Resource之间的对应关系,具体的类型为GroupVersionKind与GroupVersionResource,这里存储的是近似的关系,而不一定是完全准确的,譬如Pod这种数据,那么会生成pod和pods这两种名字的资源,而Ingress,则生成ingress和ingresses两个名字的资源。下面我们看看生成RESTMapper的代码。

    func (gmf *GroupMetaFactory) newRESTMapper(scheme *runtime.Scheme, externalVersions []schema.GroupVersion, groupMeta *apimachinery.GroupMeta) meta.RESTMapper {
        // the list of kinds that are scoped at the root of the api hierarchy
        // if a kind is not enumerated here, it is assumed to have a namespace scope
        rootScoped := sets.NewString()
        if gmf.GroupArgs.RootScopedKinds != nil {
            rootScoped = gmf.GroupArgs.RootScopedKinds
        }
        ignoredKinds := sets.NewString()
        if gmf.GroupArgs.IgnoredKinds != nil {
            ignoredKinds = gmf.GroupArgs.IgnoredKinds
        }
    
        // 创建缺省的RESTMapper
        mapper := meta.NewDefaultRESTMapper(externalVersions, groupMeta.InterfacesFor)
        for _, gv := range externalVersions {
            for kind := range scheme.KnownTypes(gv) {   // 遍历scheme中的Kind
                if ignoredKinds.Has(kind) {     // 不需要放到资源中的资源数据类型,在Install中定义的GroupMetaFactory
                    continue
                }
                scope := meta.RESTScopeNamespace
                if rootScoped.Has(kind) {       // Root资源,没有Namespace的资源,在Install中定义的GroupMetaFactory。
                                                // 一般有四种:Node,Namepsace,PersistentVolume,ComponentsStatus
                    scope = meta.RESTScopeRoot
                }
                mapper.Add(gv.WithKind(kind), scope)        // Kind 《-》 Resource 映射
            }
        }
    
        return mapper
    }
    

    到这里为止,终于分析完了ResourceEncodingConfig的缺省配置,从这里我们可以看出Registry的各个成员是如何赋值的。并且ResourceEncodingConfig的缺省配置是从Restistry中的组元数据(GroupMeta)中获得。下面我们在分析一下,针对缺省数据的覆盖是如何实现的。

    StorageEncodingOverrides

    首先,我么你研究一下,StorageSerializationOptions的缺省构建代码如下所示。

    func NewStorageSerializationOptions() *StorageSerializationOptions {
        return &StorageSerializationOptions{
            DefaultStorageVersions: legacyscheme.Registry.AllPreferredGroupVersions(),
            StorageVersions:        legacyscheme.Registry.AllPreferredGroupVersions(),
        }
    }
    

    我们在启动API Server的时候,可以通过--storage-versions启动参数来指定,哪些group使用什么版本,甚至于把某个group的版本迁移到另外一个group、version来存储。
    这个过程是在:StorageGroupsToEncodingVersion方法中完成的,并生成一个group到GroupVersion的映射。
    这个映射在NewStorageFctory方法中,传入的参数为storageEncodingOverrides,然后通过下面的函数调用与缺省的ResourceEncodingConfig进行归并,如下所示:
    resourceEncodingConfig := resourceconfig.MergeGroupEncodingConfigs(defaultResourceEncoding, storageEncodingOverrides)

    注意这里会设置外部编码(对应于ETCD存储编码)和内部编码(对应于内存对象编码)。这里Overrinding的是外部编码,而内部编码的Group仍保持不变,Version仍然是"__internal"。

    ResourceEncodingOverrides

    这里是更底层的编码配置,在例子中,我们带入了如下所示的参数:

            []schema.GroupVersionResource{
                batch.Resource("cronjobs").WithVersion("v1beta1"),
                storage.Resource("volumeattachments").WithVersion("v1beta1"),
                admissionregistration.Resource("initializerconfigurations").WithVersion("v1alpha1"),
            },
    

    这将标识batch这个group下的crontjobs资源,它将采用不同的版本v1beta1来存储。
    具体代码不难,就不细究了

    总结

    到这里为止,基本把整个API Server的存储体系分析完了,API Server的框架还是比较复杂的,并且代码量也很大, 分析到这里,感觉还是有不少地方需要再去仔细研究,本文也就是作为一个指引吧。下次看代码的时候可以在基础上继续往下深挖,不至于每次都从头开始。

    相关文章

      网友评论

          本文标题:Kubernetes 源码分析 -- API Server之Et

          本文链接:https://www.haomeiwen.com/subject/wwnigftx.html