美文网首页
Kubernetes 源码分析 -- API Server之AP

Kubernetes 源码分析 -- API Server之AP

作者: 何约什 | 来源:发表于2018-12-21 15:17 被阅读77次

前言

API Server的启动中,我们直到介绍了三种服务Master、CustomResourceDefinitions、Aggrator的创建,但是具体的API的创建部分没有介绍。
本文旨在把这块说清楚,让我们了解整个API Server对外提供了哪些API,这些API是在怎样被注册到服务中去的。我们知道,API Server对外提供的Http/Https服务,这都是基于go http服务框架来实现。

API Server支持的HTTP服务采用了go-restful与非go-restful混和的方式,主要是因为go-restful的一些标准无法完全满足需求,而且API Server要兼容旧版本的需要,所以引入了这种混杂模式。

go-restful

go-restful是第三方的REST框架,在GitHub上有多个贡献者,采用了“路由”映射的设计思想,并且在API设计中使用了流行的Fluent Style风格,试用起来酣畅淋漓,也难怪Kubernetes选择了它。下面是go-restful的优良特性。

  • Ruby on Rails风格的Rest路由映射,例如/people/{person_id}/groups/{group_id}。
  • 大大简化了Rest API的开发工作。
  • 底层实现采用Golang的HTTP协议栈,几乎没有限制。
  • 拥有完整的单元包代码,很容易开发一个可测试的Rest API。
  • Google AppEngine ready。

go-restful框架中的核心对象如下:

  • restful.Container:代表了一个HTTP Rest服务器,包括一组restful.WebService对象和一个http.ServeMux对象,使用RouteSelector进行请求派发。
  • restful.WebService:标识一个Rest服务,由多个Rest路由(restful.Route)组成,这一组Rest路由共享同一个RootPath。
  • restful.Route:标识一个Rest路由,Rest路由主要由Rest Path、HTTP Method、输入输出类型(HTML/JSON)及对应的回调函数restful.RouteFunction组成。
  • restful.RouteFunction:一个用于处理具体的REST调用的函数接口定义,具体定义为type RouteFunction func(*Request, *Response)。

服务链

服务链的核心是DelegationTarget接口,它让API Server可以实现链式服务,当有HTTP请求到来时,优先让链首去处理URI,如果能够匹配成功就处理,否则交给下一链,一直到链尾。DelegationTarget的定义如下:

type DelegationTarget interface {
    // UnprotectedHandler returns a handler that is NOT protected by a normal chain
    UnprotectedHandler() http.Handler

    // RequestContextMapper returns the existing RequestContextMapper.  Because we cannot rewire all existing
    // uses of this function, this will be used in any delegating API server
    RequestContextMapper() apirequest.RequestContextMapper

    // PostStartHooks returns the post-start hooks that need to be combined
    PostStartHooks() map[string]postStartHookEntry

    // PreShutdownHooks returns the pre-stop hooks that need to be combined
    PreShutdownHooks() map[string]preShutdownHookEntry

    // HealthzChecks returns the healthz checks that need to be combined
    HealthzChecks() []healthz.HealthzChecker

    // ListedPaths returns the paths for supporting an index
    ListedPaths() []string

    // NextDelegate returns the next delegationTarget in the chain of delegations
    NextDelegate() DelegationTarget
}
  • 它有一个空的实现emptyDelegate,一般作为链尾,由于是空的实现,所以具体的定义就不列举了。
  • 它的另一个实现是GenericAPIServer,如下所示:
type GenericAPIServer struct {
......
    // delegationTarget is the next delegate in the chain or nil
    delegationTarget DelegationTarget
    // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
    HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
}

GenericAPIServer定义了一个delegationTarget成员,在API Server整套系统中,总共有三个服务,出了链尾,都是指向GenericAPIServer实例,该成员让GenericAPIServer实现了一套链的功能。

三种服务

APIServer最终提供链式服务把基本的API Server、CustomResource、Aggregator这三种服务采用链式结构串联起来,对外提供服务。这种链式服务为API Server的可扩展性提供了基础,使增添新的服务功能,不会影响到现有的框架,只需要追加新的服务,放到链中就能够实现。而GenericAPIServer是三种服务的基础,在这三种服务中:
Master是API Server的基础服务,它提供的基础资源的API服务;
其他两种CustomResourceDefinitions(简称CRD)、API Server Aggregation(简称AA),提供了自定义资源的能力。具体见自定义资源

我们看一下对应的三种服务的定义:

基础的API Server:代码在k8s.io/kubernetes/pkg/master/master.go中

Master的结构定义如下所示:

type Master struct {
    GenericAPIServer *genericapiserver.GenericAPIServer
    ClientCARegistrationHook ClientCARegistrationHook
}

在Master实例的构建过程中,就完成了API的Install,具体代码见:completedConfig.New,在这里完成了传统API的安装以及新的资源API的安装。在早期的Kubernetes API Server的代码中,只有Master这一种服务,所以一些常见的资源如pods、service等等,都是基于传统的方式安装到REST中,而随着k8s的发展,涌现出了多种资源,而且他们的版本号也不在是v1版本,如:tokenreviews、horizontalpodautoscalers、jobs等等。所以在Master的Install API中,我们可以到InstallAPIS和InstallLegacyAPI。

  • InstallLegacyAPI

传统的API都是一些核心资源,他们的GroupName="",版本都是v1,所以,统一处于一个APIGroupInfo中,相关代码不一一贴出来了,主要的代码逻辑是,创建各种核心资源的REST对象最终组装成
map[string]rest.Storage对象。

    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.Binding,

        "podTemplates": podTemplateStorage,

        "replicationControllers":        controllerStorage.Controller,
        "replicationControllers/status": controllerStorage.Status,

        "services":        serviceRest.Service,
        "services/proxy":  serviceRest.Proxy,
        "services/status": serviceStatusStorage,

        "endpoints": endpointsStorage,

        "nodes":        nodeStorage.Node,
        "nodes/status": nodeStorage.Status,
        "nodes/proxy":  nodeStorage.Proxy,

        "events": eventStorage,

        "limitRanges":                   limitRangeStorage,
        "resourceQuotas":                resourceQuotaStorage,
        "resourceQuotas/status":         resourceQuotaStatusStorage,
        "namespaces":                    namespaceStorage,
        "namespaces/status":             namespaceStatusStorage,
        "namespaces/finalize":           namespaceFinalizeStorage,
        "secrets":                       secretStorage,
        "serviceAccounts":               serviceAccountStorage,
        "persistentVolumes":             persistentVolumeStorage,
        "persistentVolumes/status":      persistentVolumeStatusStorage,
        "persistentVolumeClaims":        persistentVolumeClaimStorage,
        "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
        "configMaps":                    configMapStorage,

        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
        restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    }
    if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
        restStorageMap["pods/eviction"] = podStorage.Eviction
    }

这个数据结构map[string]rest.Storage是key是REST的path,所以很明显它是API暴漏的关键,我们在后面也会陆续讲到,请参考Installer章节。

  • 现代的API
    其实后面的CRD与AA都是采用这种模式,但是Master中封装的更好。它对各种新的资源实现了相应的RESTStorageProvider,接口定义如下:
type RESTStorageProvider interface {
    GroupName() string
    NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)
}

RESTStorageProvider实现为REST storage的工厂,这样每个资源实现自己的NewRESTStorage方法,并构建相应的APIGroupInfo。相应的资源为:

    restStorageProviders := []RESTStorageProvider{
        authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
        authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
        autoscalingrest.RESTStorageProvider{},
        batchrest.RESTStorageProvider{},
        certificatesrest.RESTStorageProvider{},
        extensionsrest.RESTStorageProvider{},
        networkingrest.RESTStorageProvider{},
        policyrest.RESTStorageProvider{},
        rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
        schedulingrest.RESTStorageProvider{},
        settingsrest.RESTStorageProvider{},
        storagerest.RESTStorageProvider{},
        // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
        // See https://github.com/kubernetes/kubernetes/issues/42392
        appsrest.RESTStorageProvider{},
        admissionregistrationrest.RESTStorageProvider{},
        eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
    }

自定义资源服务:代码在k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go中

自定义资源服务的结构代码如下:

type CustomResourceDefinitions struct {
    GenericAPIServer *genericapiserver.GenericAPIServer

    // provided for easier embedding
    Informers internalinformers.SharedInformerFactory
}

自定义资源服务的API的安装也是在completedConfig.New中完成,API较少,相关的代码如下所示:

       apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Registry, Scheme, metav1.ParameterCodec, Codecs)
    if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
        apiGroupInfo.GroupMeta.GroupVersion = v1beta1.SchemeGroupVersion
        storage := map[string]rest.Storage{}
        // customresourcedefinitions
        customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
        storage["customresourcedefinitions"] = customResourceDefintionStorage
        storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)

        apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = storage
    }
    // 安装到ApiServerHandler的GoRestfulContainer,基于Go-restful框架的服务
        // 提供了customresourcedefinitions资源的服务能力
    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }
        ......
    crdHandler := NewCustomResourceDefinitionHandler(
        versionDiscoveryHandler,
        groupDiscoveryHandler,
        s.GenericAPIServer.RequestContextMapper(),
        s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
        delegateHandler,
        c.ExtraConfig.CRDRESTOptionsGetter,
        c.GenericConfig.AdmissionControl,
    )
    // 安装到ApiServerHandler的NonGoRestfulMux,基于go-http普通框架的服务
    // crdHandler提供自定义资源的API功能,主要包括资源的以下操作:
    // get/list/watch/create/udpate/patch/delete/deletecollection
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

在New方法中,除了上述API的安装外,还有crd、naming、finalizing控制器以及各种Informers,有必要去看待吗,这里不列举。

AggregateServer :代码在k8s.io/kube-aggregator/pkg/apiserver/apiserver.go中

主K8S API Server处理built-in资源,如Pods和Services等等,而CRD能够让我们实现了一些通用的自定义资源。
AggregateServer实现了API Server Aggregation功能,它让我们可以提供自定义资源的特殊实现,并且部署我们独立API Server,主API Server把请求代理给独立的API Server来处理自定义资源,从而让资源对它的所有的客户端可用。

AggregatorServer的构建代码也是对应的completeConfig.NewWithDelegate方法中完成,该方法带了一个参数,参数的实例基于主API Server对应的GenericAPIServer实例。

AggregatorServer启动了APIServer的共享通知: &apiregistration.APIService{},基于共享通知的消息,在两个控制器中进行处理:apiserviceRegistrationController和availableController。

在API的处理上也是分为两部分:

  • go-restful部分,也是基于APIGroupInfo来安装API,见k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go中的NewRESTStorage方法:
func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) genericapiserver.APIGroupInfo {
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Registry, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)

    if apiResourceConfigSource.VersionEnabled(v1beta1.SchemeGroupVersion) {
        apiGroupInfo.GroupMeta.GroupVersion = v1beta1.SchemeGroupVersion
        storage := map[string]rest.Storage{}
        apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
        storage["apiservices"] = apiServiceREST
        storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
        apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = storage
    }

    if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {
        apiGroupInfo.GroupMeta.GroupVersion = v1.SchemeGroupVersion
        storage := map[string]rest.Storage{}
        apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
        storage["apiservices"] = apiServiceREST
        storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
        apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
    }

    return apiGroupInfo
}

在代码中,根据是否启用v1beta1和v1版本,实现了apiservice和apiservices/status两种资源类型。

  • non-go-restful部分,见:k8s.io/kubu-aggregator/pkg/apiserver/handler_apis.go中的apisHandler结构
    apisHanders服务与/apis端,它实现了http.Handler,下面是它的ServeHTTP方法的代码实现:
func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    ctx, ok := r.mapper.Get(req)
    if !ok {
        responsewriters.InternalError(w, req, errors.New("no context found for request"))
        return
    }

    discoveryGroupList := &metav1.APIGroupList{
        // always add OUR api group to the list first.  Since we'll never have a registered APIService for it
        // and since this is the crux of the API, having this first will give our names priority.  It's good to be king.
        Groups: []metav1.APIGroup{discoveryGroup},
    }

    apiServices, err := r.lister.List(labels.Everything())
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    apiServicesByGroup := apiregistrationapi.SortedByGroupAndVersion(apiServices)
    for _, apiGroupServers := range apiServicesByGroup {
        // skip the legacy group
        if len(apiGroupServers[0].Spec.Group) == 0 {
            continue
        }
        discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers)
        if discoveryGroup != nil {
            discoveryGroupList.Groups = append(discoveryGroupList.Groups, *discoveryGroup)
        }
    }

    responsewriters.WriteObjectNegotiated(ctx, r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
}

问:看起来apisHandlers的目的是把找到的合适的api server返回?这点我没有搞明白,理论上,AA应该是做代理功能,把请求转发给对应的自定义API Servers去处理才对。

ANSWER:明白了,apisHandlers其实就是ROOT路径/apis的服务器,本来是由GenericAPIServer.DiscoveryGroupManager来实现这个功能的,但是AA需要实时获取注册进来的自定义API Servers信息,并且AA相关信息总是放在/apis相应的头部,如下所示:(没有自定义的API Servers的情况)

yuxianbing@ubuntu:~$ curl http://127.0.0.1:8080/apis
{
  "kind": "APIGroupList",
  "apiVersion": "v1",
  "groups": [
    {
      "name": "apiregistration.k8s.io",
      "versions": [
        {
          "groupVersion": "apiregistration.k8s.io/v1",
          "version": "v1"
        },
        {
          "groupVersion": "apiregistration.k8s.io/v1beta1",
          "version": "v1beta1"
        }
      ],
      "preferredVersion": {
        "groupVersion": "apiregistration.k8s.io/v1",
        "version": "v1"
      },
      "serverAddressByClientCIDRs": null
    },
    ......

GenericAPIServer

下面是GenericAPIServer的完整定义:

// GenericAPIServer contains state for a Kubernetes cluster api server.
type GenericAPIServer struct {
    // discoveryAddresses is used to build cluster IPs for discovery.
    discoveryAddresses discovery.Addresses

    // LoopbackClientConfig is a config for a privileged loopback connection to the API server
    LoopbackClientConfig *restclient.Config

    // minRequestTimeout is how short the request timeout can be.  This is used to build the RESTHandler
    minRequestTimeout time.Duration

    // ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server
    // gracefully shutdown returns.
    ShutdownTimeout time.Duration

    // legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
    // to InstallLegacyAPIGroup
    legacyAPIGroupPrefixes sets.String

    // admissionControl is used to build the RESTStorage that backs an API Group.
    admissionControl admission.Interface

    // requestContextMapper provides a way to get the context for a request.  It may be nil.
    requestContextMapper apirequest.RequestContextMapper

    SecureServingInfo *SecureServingInfo

    // ExternalAddress is the address (hostname or IP and port) that should be used in
    // external (public internet) URLs for this GenericAPIServer.
    ExternalAddress string

    // Serializer controls how common API objects not in a group/version prefix are serialized for this server.
    // Individual APIGroups may define their own serializers.
    Serializer runtime.NegotiatedSerializer

    // "Outputs"
    // Handler holds the handlers being used by this API server
        // 实现了API Server对外的API功能,主要包括两个部分:go-restful和non-go-restful。
    Handler *APIServerHandler

    // listedPathProvider is a lister which provides the set of paths to show at /
    listedPathProvider routes.ListedPathProvider

    // DiscoveryGroupManager serves /apis
        // 一般用于实现/apis的服务,从/apis的输出来看,基本上显示的是各个Group的API与版本的信息。
    DiscoveryGroupManager discovery.GroupManager

    // Enable swagger and/or OpenAPI if these configs are non-nil.
    swaggerConfig *swagger.Config
    openAPIConfig *openapicommon.Config

    // PostStartHooks are each called after the server has started listening, in a separate go func for each
    // with no guarantee of ordering between them.  The map key is a name used for error reporting.
    // It may kill the process with a panic if it wishes to by returning an error.
    postStartHookLock      sync.Mutex
    postStartHooks         map[string]postStartHookEntry
    postStartHooksCalled   bool
    disabledPostStartHooks sets.String

    preShutdownHookLock    sync.Mutex
    preShutdownHooks       map[string]preShutdownHookEntry
    preShutdownHooksCalled bool

    // healthz checks
    healthzLock    sync.Mutex
    healthzChecks  []healthz.HealthzChecker
    healthzCreated bool

    // auditing. The backend is started after the server starts listening.
    AuditBackend audit.Backend

    // enableAPIResponseCompression indicates whether API Responses should support compression
    // if the client requests it via Accept-Encoding
    enableAPIResponseCompression bool

    // delegationTarget is the next delegate in the chain or nil
        // 实现链式结构,一般指向的实例类型也是GenericAPIServer
    delegationTarget DelegationTarget

    // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
    HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
}

GenericAPIServer里面包含了几个重要的成员,首先是APIServerHandler类型的Handler成员,它存储了该Server所服务的API,后面要讲到的API的安装功能,主要就是把要服务的资源对应的API存储在Handler中的GoRestfulContainer中。

APIServerHandler包含了API Server使用的多种http.Handler类型,包括go-restful以及non-go-restful,以及在以上两者之间选择的Director对象,API URI处理的选择过程为:FullHandlerChain-> Director ->{GoRestfulContainer, NonGoRestfulMux}。

其次是DiscoveryGroupManager,它负责存储支持的API的Group和Version信息,并提供对/apis的调用服务。

APIGroupInfo

前面在三个API Server的API安装中,提到过go-restful模式的API安装都用到了APIGroupInfo,一般都是先生成一个APIGroupInfo实例,在该实例中,把我们关心的资源类型存好,然后调用GenericAPIServer.InstallAPIGroup完成API的Install操作。

// Info about an API group.
type APIGroupInfo struct {
    GroupMeta apimachinery.GroupMeta
    // Info about the resources in this group. Its a map from version to resource to the storage.
    VersionedResourcesStorageMap map[string]map[string]rest.Storage
    // OptionsExternalVersion controls the APIVersion used for common objects in the
    // schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
    // define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
    // If nil, defaults to groupMeta.GroupVersion.
    // TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
    OptionsExternalVersion *schema.GroupVersion
    // MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
    // common API implementations like ListOptions. Future changes will allow this to vary by group
    // version (for when the inevitable meta/v2 group emerges).
    MetaGroupVersion *schema.GroupVersion

    // Scheme includes all of the types used by this group and how to convert between them (or
    // to convert objects from outside of this group that are accepted in this API).
    // TODO: replace with interfaces
    Scheme *runtime.Scheme
    // NegotiatedSerializer controls how this group encodes and decodes data
    NegotiatedSerializer runtime.NegotiatedSerializer
    // ParameterCodec performs conversions for query parameters passed to API calls
    ParameterCodec runtime.ParameterCodec
}

GenericAPIServer.InstallAPIGroup

InstallAPIGroup负责把给定的API Group暴漏到API中,主要功能有两块:

  • 调用installAPIResources实现资源API的暴漏
  • API Group的的服务,为了/apis和/apis/<GroupName>两种服务,其中/apis通过把信息存放在DiscoveryGroupManager成员中,/apis/<GroupName>的通过构建APIGroupHandler,并存放到GoRestfulContainer中。

代码如下所示:

func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
    // Do not register empty group or empty version.  Doing so claims /apis/ for the wrong entity to be returned.
    // Catching these here places the error  much closer to its origin
    if len(apiGroupInfo.GroupMeta.GroupVersion.Group) == 0 {
        return fmt.Errorf("cannot register handler with an empty group for %#v", *apiGroupInfo)
    }
    if len(apiGroupInfo.GroupMeta.GroupVersion.Version) == 0 {
        return fmt.Errorf("cannot register handler with an empty version for %#v", *apiGroupInfo)
    }
    // 这里实现资源的API暴漏
    if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo); err != nil {
        return err
    }

    // setup discovery
    // Install the version handler.
    // Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
    apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
    for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
        // Check the config to make sure that we elide versions that don't have any resources
        if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
            continue
        }
        apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
            GroupVersion: groupVersion.String(),
            Version:      groupVersion.Version,
        })
    }
    preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
        GroupVersion: apiGroupInfo.GroupMeta.GroupVersion.String(),
        Version:      apiGroupInfo.GroupMeta.GroupVersion.Version,
    }
    apiGroup := metav1.APIGroup{
        Name:             apiGroupInfo.GroupMeta.GroupVersion.Group,
        Versions:         apiVersionsForDiscovery,
        PreferredVersion: preferredVersionForDiscovery,
    }
    // 把该API Group信息存储到DiscoveryGroupManager中,让后续暴漏的/apis中使用
    s.DiscoveryGroupManager.AddGroup(apiGroup)
    // 生成APIGrouphandler
    s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup, s.requestContextMapper).WebService())

    return nil
}

GenericAPIServer.installAPIResources

installAPIResources是一个用于安装REST存储的私有方法,用来支撑各种api groupversionresource。该函数的代码逻辑非常简单,它循环扫描APIGroupInfo.GroupMeta.GroupVersions成员,生成相应的APIGroupVersion实例,并通过InstallREST方法把该GroupVersion的资源注册服务。

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
    for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
        if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
            glog.Warningf("Skipping API %v because it has no resources.", groupVersion)
            continue
        }
                // 这里生成APIGroupVersion实例,是REST API的关键
        apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        if apiGroupInfo.OptionsExternalVersion != nil {
            apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
        }

        if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
            return fmt.Errorf("Unable to setup API %v: %v", apiGroupInfo, err)
        }
    }

    return nil
}

getApiGropuVersion的代码如下,它通过组装了所有的资源对应的storage,并生成了APIGroupVersion实例,代码如下:

func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
    storage := make(map[string]rest.Storage)
    for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
        storage[strings.ToLower(k)] = v
    }
    version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
    version.Root = apiPrefix
    version.Storage = storage
    return version
}

APIGroupVersion是区分API版本的关键,以下是代码:

type APIGroupVersion struct {
    Storage map[string]rest.Storage

    Root string

    // GroupVersion is the external group version
    GroupVersion schema.GroupVersion

    // OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
    // schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
    // define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
    // empty, defaults to GroupVersion.
    OptionsExternalVersion *schema.GroupVersion
    // MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
    // common API implementations like ListOptions. Future changes will allow this to vary by group
    // version (for when the inevitable meta/v2 group emerges).
    MetaGroupVersion *schema.GroupVersion

    Mapper meta.RESTMapper

    // Serializer is used to determine how to convert responses from API methods into bytes to send over
    // the wire.
    Serializer     runtime.NegotiatedSerializer
    ParameterCodec runtime.ParameterCodec

    Typer           runtime.ObjectTyper
    Creater         runtime.ObjectCreater
    Convertor       runtime.ObjectConvertor
    Defaulter       runtime.ObjectDefaulter
    Linker          runtime.SelfLinker
    UnsafeConvertor runtime.ObjectConvertor

    Admit   admission.Interface
    Context request.RequestContextMapper

    MinRequestTimeout time.Duration

    // EnableAPIResponseCompression indicates whether API Responses should support compression
    // if the client requests it via Accept-Encoding
    EnableAPIResponseCompression bool
}
  • storage
    前面在构建APIGroupVersion 实例的过程中,我们看到第一个成员Storage的身影,这个变量我们要分析清楚,它类型为map[string]rest.Storage。所以storage变量是一个Map,Key为Rest API的path ,Value为rest.Storage接口,此接口是一个通用的符合Restful要求的资源存储服务接口,每个服务接口负责处理一类(Kind)Kubernetes API中的数据对象-----资源你数据,只有一个接口方法:New(),New()方法返回该Storage服务所能识别和管理的某种具体的资源数据逇一个空实例。
type Storage interface {
    // New returns an empty object that can be used with Create and Update after request data has been put into it.
    // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
    New() runtime.Object
}

在运行期间,Kubernetes API Runtime运行时框架会把New()方法返回的空对象的指针传入Codec.DecodeInto([]byte, runtime.Object)方法中,从而完成HTTP Rest请求中的Byte数组反序列化逻辑。Kubernetes API Server中所有对外提供服务的Restful资源都实现了此接口,这些资源包括pods、bindings、podTemplates、replicationControllers、services等,三个服务都有自己的列表,其中CRD和AA我们已经在前面的讲解中把代码贴出来了。而Master这块其实有两块资源安装:最新的资源以及传统资源模式,后面单独介绍这块。

APIGroupVersion是与rest.Storage Map绑定的,并且绑定了相应版本的Codec、Convertor用于版本转换,这样就很容易理解Kubernetes是怎么区分多版本API的Rest服务的。

  • InstallREST
    在APIGroupVersion的InstallREST(constainer *restful.Container)方法里,用Version变量来构造一个Rest API Path的前缀并赋值给APIInstall的prefix变量,并调用他的Install()方法完成Rest API的转换,代码如下:
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
    prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
    installer := &APIInstaller{
        group:                        g,
        prefix:                       prefix,
        minRequestTimeout:            g.MinRequestTimeout,
        enableAPIResponseCompression: g.EnableAPIResponseCompression,
    }

    apiResources, ws, registrationErrors := installer.Install()
    versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}, g.Context)
    versionDiscoveryHandler.AddToWebService(ws)
    container.Add(ws)
    return utilerrors.NewAggregate(registrationErrors)
}
  • APIInstall.Install
    接着,在APIInstaller的Install()方法里用prefix(API版本)前缀生成WebService的相对根路径:
func (a *APIInstaller) newWebService() *restful.WebService {
    ws := new(restful.WebService)
    ws.Path(a.prefix)
    // a.prefix contains "prefix/group/version"
    ws.Doc("API at " + a.prefix)
    // Backwards compatibility, we accepted objects with empty content-type at V1.
    // If we stop using go-restful, we can default empty content-type to application/json on an
    // endpoint by endpoint basis
    ws.Consumes("*/*")
    mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
    ws.Produces(append(mediaTypes, streamMediaTypes...)...)
    ws.ApiVersion(a.group.GroupVersion.String())

    return ws
}
  • 如何实现多版本支持呢?

以前面我们列举过的k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go中的NewRESTStorage方法中,可以看到,根据是否启用了v1beta1与v1版本的API,我们生成了不同的APIGroupInfo,如果同时启用了v1beta1与v1版本的话,最终调用APIGroupVersion的InstallREST方法,从而完成了最终的多版本API的Rest服务装配流程。

APIInstaller

前面我们多次讲到map[string]rest.Storage,从Master(传统模式与现代模式)、CRD、AA三个服务InstallGroupInfo中,都最终生成了这样一个Map实例。

这个Map的Key是Rest API的访问路径,Value却不是之前说好的restful.Route。所以必须存在一个“转换适配”的方法来实现上述转换!转化你的方法在pkg/apiserver/api_install.go的下属方法里:
func (a *APIInstaller) registerResoruceHandlers(path string, storage rest.Storage, ws *restful.WebService, proxyHandler http.Handler)

上述方法把一个path对应的rest.Storage转换成一系列的restful.Route并添加到指针restful.WebService中。这个函数的代码之所以很长,是因为有各种情况要考虑,比如pods/portforward这种路径要处理child,还要判断美中Storage资源类型锁支持的操作类型:比如是否支持create、delete、update及是否支持list、watch、pathcer等,对各种情况都考虑以后,这个函数的代码量已经超过500行!于是在外面封装了一个简单函数:func (a *APIInstlal)Install,内部循环调用registerResourceHandlers,返回最终的restful.WebService对象,次方法的主要代码如下:

func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
    var apiResources []metav1.APIResource
    var errors []error
    ws := a.newWebService()

    proxyHandler := (&handlers.ProxyHandler{
        Prefix:     a.prefix + "/proxy/",
        Storage:    a.group.Storage,
        Serializer: a.group.Serializer,
        Mapper:     a.group.Context,
    })

    // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
    paths := make([]string, len(a.group.Storage))
    var i int = 0
    for path := range a.group.Storage {
        paths[i] = path
        i++
    }
    sort.Strings(paths)
    for _, path := range paths {
        apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws, proxyHandler)
        if err != nil {
            errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
        }
        if apiResource != nil {
            apiResources = append(apiResources, *apiResource)
        }
    }
    return apiResources, ws, errors
}

Install()方法循环调用了registerResourceHandlers函数,该函数实现了rest.Storage到restful.Route的转换,由于该函数代码比较长,这里只列举相关的片段:

    creater, isCreater := storage.(rest.Creater)
    namedCreater, isNamedCreater := storage.(rest.NamedCreater)
    lister, isLister := storage.(rest.Lister)
    getter, isGetter := storage.(rest.Getter)
    getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
    deleter, isDeleter := storage.(rest.Deleter)
    gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
    collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
    updater, isUpdater := storage.(rest.Updater)
    patcher, isPatcher := storage.(rest.Patcher)
    watcher, isWatcher := storage.(rest.Watcher)
    _, isRedirector := storage.(rest.Redirector)
    connecter, isConnecter := storage.(rest.Connecter)
    storageMeta, isMetadata := storage.(rest.StorageMetadata)

前面我们提到rest.Storage接口只有一个New方法,一般的资源对象存储,出了实现rest.Storage接口之外,还实现多种REST操作接口,如上所示,具体各种资源数据对象的存储对象见ETCD存储分析。这段代码对storage对象进行判断,以确定并标记它锁满足的API Rest接口类型,而接下来的这段代码在此基础上确定此接口所包含的actions,后者则对应到某种HTTP请求方法(GET/POST/PUT/DELETE)或者HTTP PROXY、WATCH、CONNECT等动作;

        actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
        actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
        actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
        // DEPRECATED
        actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

        actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
        if getSubpath {
            actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
        }
        actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
        actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
        actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
        actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
        // We add "proxy" subresource to remove the need for the generic top level prefix proxy.
        // The generic top level prefix proxy is deprecated in v1.2, and will be removed in 1.3, or 1.4 at the latest.
        // TODO: DEPRECATED in v1.2.
        actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer, false}, isRedirector)
        // TODO: DEPRECATED in v1.2.
        actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer, false}, isRedirector)
        actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
        actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

我们注意到rest.Redirector类型的storage被当作PROXY进行处理,由apiserver.ProxyHandler进行拦截,并调用rest.Redirector的ResourceLocation方法获取资源的处理路径(可能包含一个非空的http.RoundTripper),用于处理执行Redirector返回的URL请求)。Kubernetes API Server中PROXY请求存在的意义在于透明地访问某个其他节点(比如某个Minion)上的API。

最后,我们来分析下registerResourcesHandles中完成从rest.Storage到restful.Route映射的最后一段关键代码。下面是rest.Getter接口的Storage映射代码:

    case "GET": // Get a resource.
            var handler restful.RouteFunction
            if isGetterWithOptions {
                handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, hasSubresource)
            } else {
                handler = restfulGetResource(getter, exporter, reqScope)
            }

            if needOverride {
                // need change the reported verb
                handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler)
            } else {
                handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler)
            }

            if a.enableAPIResponseCompression {
                handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
            }
            doc := "read the specified " + kind
            if hasSubresource {
                doc = "read " + subresource + " of the specified " + kind
            }
            route := ws.GET(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                Writes(producedObject)
            if isGetterWithOptions {
                if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
                    return nil, err
                }
            }
            if isExporter {
                if err := addObjectParams(ws, route, versionedExportOptions); err != nil {
                    return nil, err
                }
            }
            addParams(route, action.Params)
            routes = append(routes, route)

上述代码首先通过函数restfulGetResourceWithOptions或者restfulGetResource创建了一个restful.RouteFunction,然后生成一个restful.route对象,最后注册到 restful.WebService中,从而完成了rest.Storage到Rest服务的“最后一公里”通车。restfulGetResource函数的定义如下:

func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.GetResource(r, e, scope)(res.ResponseWriter, req.Request)
    }
}

handlers.GetResource的定义如下:

func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.HandlerFunc {
    return getResourceHandler(scope,
        func(ctx request.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
            // check for export
            options := metav1.GetOptions{}
            if values := req.URL.Query(); len(values) > 0 {
                exports := metav1.ExportOptions{}
                if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &exports); err != nil {
                    err = errors.NewBadRequest(err.Error())
                    return nil, err
                }
                if exports.Export {
                    if e == nil {
                        return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource))
                    }
                    return e.Export(ctx, name, exports)
                }
                if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
                    err = errors.NewBadRequest(err.Error())
                    return nil, err
                }
            }
            if trace != nil {
                trace.Step("About to Get from storage")
            }
                        //  这里调用了资源对象存储的Get方法,从而返回具体的资源对象
            return r.Get(ctx, name, &options)
        })
}

最终,我们看到了API的服务最终通过调用r.Get(ctx, name, &options)方法,从而得以返回某个资源对象。

在上面的查询操作中,没有权限控制,但是查看一下createHandler方法,就能看到权限控制的身影,如下所示的代码片段:

        admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, userInfo)
        if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
            err = mutatingAdmission.Admit(admissionAttributes)
            if err != nil {
                scope.err(err, w, req)
                return
            }
        }

而对于资源的Create、Update、Delete、Connect、Patch等操作都有类似的权限控制,从Admit的参数admission.Attributes的属性来看,第三方系统可以开发细粒度的权限控制插件,针对任意资源的任意属性进行细粒度的权限控制,因为资源对象本身都传递到参数中了。

  • 资源数据对象的序列化与版本化

这里也列举了createHandler中的代码片段,如下所示:

        gv := scope.Kind.GroupVersion()
                // 得到合适的SerializerInfo
        s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
        if err != nil {
            scope.err(err, w, req)
            return
        }
                // 找到合适的decoder
        decoder := scope.Serializer.DecoderToVersion(s.Serializer, schema.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal})

        body, err := readBody(req)
        if err != nil {
            scope.err(err, w, req)
            return
        }
        defaultGVK := scope.Kind
        original := r.New()
        trace.Step("About to convert to expected version")
                //  采用decoder解码
        obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
        if err != nil {
            err = transformDecodeError(typer, err, original, gvk, body)
            scope.err(err, w, req)
            return
        }

相关文章

网友评论

      本文标题:Kubernetes 源码分析 -- API Server之AP

      本文链接:https://www.haomeiwen.com/subject/qkjtnftx.html