前言
API Server的启动中,我们直到介绍了三种服务Master、CustomResourceDefinitions、Aggrator的创建,但是具体的API的创建部分没有介绍。
本文旨在把这块说清楚,让我们了解整个API Server对外提供了哪些API,这些API是在怎样被注册到服务中去的。我们知道,API Server对外提供的Http/Https服务,这都是基于go http服务框架来实现。
API Server支持的HTTP服务采用了go-restful与非go-restful混和的方式,主要是因为go-restful的一些标准无法完全满足需求,而且API Server要兼容旧版本的需要,所以引入了这种混杂模式。
go-restful
go-restful是第三方的REST框架,在GitHub上有多个贡献者,采用了“路由”映射的设计思想,并且在API设计中使用了流行的Fluent Style风格,试用起来酣畅淋漓,也难怪Kubernetes选择了它。下面是go-restful的优良特性。
- Ruby on Rails风格的Rest路由映射,例如/people/{person_id}/groups/{group_id}。
- 大大简化了Rest API的开发工作。
- 底层实现采用Golang的HTTP协议栈,几乎没有限制。
- 拥有完整的单元包代码,很容易开发一个可测试的Rest API。
- Google AppEngine ready。
go-restful框架中的核心对象如下:
- restful.Container:代表了一个HTTP Rest服务器,包括一组restful.WebService对象和一个http.ServeMux对象,使用RouteSelector进行请求派发。
- restful.WebService:标识一个Rest服务,由多个Rest路由(restful.Route)组成,这一组Rest路由共享同一个RootPath。
- restful.Route:标识一个Rest路由,Rest路由主要由Rest Path、HTTP Method、输入输出类型(HTML/JSON)及对应的回调函数restful.RouteFunction组成。
- restful.RouteFunction:一个用于处理具体的REST调用的函数接口定义,具体定义为type RouteFunction func(*Request, *Response)。
服务链
服务链的核心是DelegationTarget接口,它让API Server可以实现链式服务,当有HTTP请求到来时,优先让链首去处理URI,如果能够匹配成功就处理,否则交给下一链,一直到链尾。DelegationTarget的定义如下:
type DelegationTarget interface {
// UnprotectedHandler returns a handler that is NOT protected by a normal chain
UnprotectedHandler() http.Handler
// RequestContextMapper returns the existing RequestContextMapper. Because we cannot rewire all existing
// uses of this function, this will be used in any delegating API server
RequestContextMapper() apirequest.RequestContextMapper
// PostStartHooks returns the post-start hooks that need to be combined
PostStartHooks() map[string]postStartHookEntry
// PreShutdownHooks returns the pre-stop hooks that need to be combined
PreShutdownHooks() map[string]preShutdownHookEntry
// HealthzChecks returns the healthz checks that need to be combined
HealthzChecks() []healthz.HealthzChecker
// ListedPaths returns the paths for supporting an index
ListedPaths() []string
// NextDelegate returns the next delegationTarget in the chain of delegations
NextDelegate() DelegationTarget
}
- 它有一个空的实现emptyDelegate,一般作为链尾,由于是空的实现,所以具体的定义就不列举了。
- 它的另一个实现是GenericAPIServer,如下所示:
type GenericAPIServer struct {
......
// delegationTarget is the next delegate in the chain or nil
delegationTarget DelegationTarget
// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
}
GenericAPIServer定义了一个delegationTarget成员,在API Server整套系统中,总共有三个服务,出了链尾,都是指向GenericAPIServer实例,该成员让GenericAPIServer实现了一套链的功能。
三种服务
APIServer最终提供链式服务把基本的API Server、CustomResource、Aggregator这三种服务采用链式结构串联起来,对外提供服务。这种链式服务为API Server的可扩展性提供了基础,使增添新的服务功能,不会影响到现有的框架,只需要追加新的服务,放到链中就能够实现。而GenericAPIServer是三种服务的基础,在这三种服务中:
Master是API Server的基础服务,它提供的基础资源的API服务;
其他两种CustomResourceDefinitions(简称CRD)、API Server Aggregation(简称AA),提供了自定义资源的能力。具体见自定义资源。
我们看一下对应的三种服务的定义:
基础的API Server:代码在k8s.io/kubernetes/pkg/master/master.go中
Master的结构定义如下所示:
type Master struct {
GenericAPIServer *genericapiserver.GenericAPIServer
ClientCARegistrationHook ClientCARegistrationHook
}
在Master实例的构建过程中,就完成了API的Install,具体代码见:completedConfig.New,在这里完成了传统API的安装以及新的资源API的安装。在早期的Kubernetes API Server的代码中,只有Master这一种服务,所以一些常见的资源如pods、service等等,都是基于传统的方式安装到REST中,而随着k8s的发展,涌现出了多种资源,而且他们的版本号也不在是v1版本,如:tokenreviews、horizontalpodautoscalers、jobs等等。所以在Master的Install API中,我们可以到InstallAPIS和InstallLegacyAPI。
- InstallLegacyAPI
传统的API都是一些核心资源,他们的GroupName="",版本都是v1,所以,统一处于一个APIGroupInfo中,相关代码不一一贴出来了,主要的代码逻辑是,创建各种核心资源的REST对象最终组装成
map[string]rest.Storage对象。
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
这个数据结构map[string]rest.Storage是key是REST的path,所以很明显它是API暴漏的关键,我们在后面也会陆续讲到,请参考Installer章节。
- 现代的API
其实后面的CRD与AA都是采用这种模式,但是Master中封装的更好。它对各种新的资源实现了相应的RESTStorageProvider,接口定义如下:
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)
}
RESTStorageProvider实现为REST storage的工厂,这样每个资源实现自己的NewRESTStorage方法,并构建相应的APIGroupInfo。相应的资源为:
restStorageProviders := []RESTStorageProvider{
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.RESTStorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
自定义资源服务:代码在k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go中
自定义资源服务的结构代码如下:
type CustomResourceDefinitions struct {
GenericAPIServer *genericapiserver.GenericAPIServer
// provided for easier embedding
Informers internalinformers.SharedInformerFactory
}
自定义资源服务的API的安装也是在completedConfig.New中完成,API较少,相关的代码如下所示:
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Registry, Scheme, metav1.ParameterCodec, Codecs)
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
apiGroupInfo.GroupMeta.GroupVersion = v1beta1.SchemeGroupVersion
storage := map[string]rest.Storage{}
// customresourcedefinitions
customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
storage["customresourcedefinitions"] = customResourceDefintionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)
apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = storage
}
// 安装到ApiServerHandler的GoRestfulContainer,基于Go-restful框架的服务
// 提供了customresourcedefinitions资源的服务能力
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
......
crdHandler := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.GenericAPIServer.RequestContextMapper(),
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
)
// 安装到ApiServerHandler的NonGoRestfulMux,基于go-http普通框架的服务
// crdHandler提供自定义资源的API功能,主要包括资源的以下操作:
// get/list/watch/create/udpate/patch/delete/deletecollection
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
在New方法中,除了上述API的安装外,还有crd、naming、finalizing控制器以及各种Informers,有必要去看待吗,这里不列举。
AggregateServer :代码在k8s.io/kube-aggregator/pkg/apiserver/apiserver.go中
主K8S API Server处理built-in资源,如Pods和Services等等,而CRD能够让我们实现了一些通用的自定义资源。
AggregateServer实现了API Server Aggregation功能,它让我们可以提供自定义资源的特殊实现,并且部署我们独立API Server,主API Server把请求代理给独立的API Server来处理自定义资源,从而让资源对它的所有的客户端可用。
AggregatorServer的构建代码也是对应的completeConfig.NewWithDelegate方法中完成,该方法带了一个参数,参数的实例基于主API Server对应的GenericAPIServer实例。
AggregatorServer启动了APIServer的共享通知: &apiregistration.APIService{},基于共享通知的消息,在两个控制器中进行处理:apiserviceRegistrationController和availableController。
在API的处理上也是分为两部分:
- go-restful部分,也是基于APIGroupInfo来安装API,见k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go中的NewRESTStorage方法:
func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) genericapiserver.APIGroupInfo {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Registry, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)
if apiResourceConfigSource.VersionEnabled(v1beta1.SchemeGroupVersion) {
apiGroupInfo.GroupMeta.GroupVersion = v1beta1.SchemeGroupVersion
storage := map[string]rest.Storage{}
apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
storage["apiservices"] = apiServiceREST
storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = storage
}
if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {
apiGroupInfo.GroupMeta.GroupVersion = v1.SchemeGroupVersion
storage := map[string]rest.Storage{}
apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
storage["apiservices"] = apiServiceREST
storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
}
return apiGroupInfo
}
在代码中,根据是否启用v1beta1和v1版本,实现了apiservice和apiservices/status两种资源类型。
- non-go-restful部分,见:k8s.io/kubu-aggregator/pkg/apiserver/handler_apis.go中的apisHandler结构
apisHanders服务与/apis端,它实现了http.Handler,下面是它的ServeHTTP方法的代码实现:
func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx, ok := r.mapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
discoveryGroupList := &metav1.APIGroupList{
// always add OUR api group to the list first. Since we'll never have a registered APIService for it
// and since this is the crux of the API, having this first will give our names priority. It's good to be king.
Groups: []metav1.APIGroup{discoveryGroup},
}
apiServices, err := r.lister.List(labels.Everything())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
apiServicesByGroup := apiregistrationapi.SortedByGroupAndVersion(apiServices)
for _, apiGroupServers := range apiServicesByGroup {
// skip the legacy group
if len(apiGroupServers[0].Spec.Group) == 0 {
continue
}
discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers)
if discoveryGroup != nil {
discoveryGroupList.Groups = append(discoveryGroupList.Groups, *discoveryGroup)
}
}
responsewriters.WriteObjectNegotiated(ctx, r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
}
问:看起来apisHandlers的目的是把找到的合适的api server返回?这点我没有搞明白,理论上,AA应该是做代理功能,把请求转发给对应的自定义API Servers去处理才对。
ANSWER:明白了,apisHandlers其实就是ROOT路径/apis的服务器,本来是由GenericAPIServer.DiscoveryGroupManager来实现这个功能的,但是AA需要实时获取注册进来的自定义API Servers信息,并且AA相关信息总是放在/apis相应的头部,如下所示:(没有自定义的API Servers的情况)
yuxianbing@ubuntu:~$ curl http://127.0.0.1:8080/apis
{
"kind": "APIGroupList",
"apiVersion": "v1",
"groups": [
{
"name": "apiregistration.k8s.io",
"versions": [
{
"groupVersion": "apiregistration.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "apiregistration.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "apiregistration.k8s.io/v1",
"version": "v1"
},
"serverAddressByClientCIDRs": null
},
......
GenericAPIServer
下面是GenericAPIServer的完整定义:
// GenericAPIServer contains state for a Kubernetes cluster api server.
type GenericAPIServer struct {
// discoveryAddresses is used to build cluster IPs for discovery.
discoveryAddresses discovery.Addresses
// LoopbackClientConfig is a config for a privileged loopback connection to the API server
LoopbackClientConfig *restclient.Config
// minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler
minRequestTimeout time.Duration
// ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server
// gracefully shutdown returns.
ShutdownTimeout time.Duration
// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
// to InstallLegacyAPIGroup
legacyAPIGroupPrefixes sets.String
// admissionControl is used to build the RESTStorage that backs an API Group.
admissionControl admission.Interface
// requestContextMapper provides a way to get the context for a request. It may be nil.
requestContextMapper apirequest.RequestContextMapper
SecureServingInfo *SecureServingInfo
// ExternalAddress is the address (hostname or IP and port) that should be used in
// external (public internet) URLs for this GenericAPIServer.
ExternalAddress string
// Serializer controls how common API objects not in a group/version prefix are serialized for this server.
// Individual APIGroups may define their own serializers.
Serializer runtime.NegotiatedSerializer
// "Outputs"
// Handler holds the handlers being used by this API server
// 实现了API Server对外的API功能,主要包括两个部分:go-restful和non-go-restful。
Handler *APIServerHandler
// listedPathProvider is a lister which provides the set of paths to show at /
listedPathProvider routes.ListedPathProvider
// DiscoveryGroupManager serves /apis
// 一般用于实现/apis的服务,从/apis的输出来看,基本上显示的是各个Group的API与版本的信息。
DiscoveryGroupManager discovery.GroupManager
// Enable swagger and/or OpenAPI if these configs are non-nil.
swaggerConfig *swagger.Config
openAPIConfig *openapicommon.Config
// PostStartHooks are each called after the server has started listening, in a separate go func for each
// with no guarantee of ordering between them. The map key is a name used for error reporting.
// It may kill the process with a panic if it wishes to by returning an error.
postStartHookLock sync.Mutex
postStartHooks map[string]postStartHookEntry
postStartHooksCalled bool
disabledPostStartHooks sets.String
preShutdownHookLock sync.Mutex
preShutdownHooks map[string]preShutdownHookEntry
preShutdownHooksCalled bool
// healthz checks
healthzLock sync.Mutex
healthzChecks []healthz.HealthzChecker
healthzCreated bool
// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
// enableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
enableAPIResponseCompression bool
// delegationTarget is the next delegate in the chain or nil
// 实现链式结构,一般指向的实例类型也是GenericAPIServer
delegationTarget DelegationTarget
// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
}
GenericAPIServer里面包含了几个重要的成员,首先是APIServerHandler类型的Handler成员,它存储了该Server所服务的API,后面要讲到的API的安装功能,主要就是把要服务的资源对应的API存储在Handler中的GoRestfulContainer中。
APIServerHandler包含了API Server使用的多种http.Handler类型,包括go-restful以及non-go-restful,以及在以上两者之间选择的Director对象,API URI处理的选择过程为:FullHandlerChain-> Director ->{GoRestfulContainer, NonGoRestfulMux}。
其次是DiscoveryGroupManager,它负责存储支持的API的Group和Version信息,并提供对/apis的调用服务。
APIGroupInfo
前面在三个API Server的API安装中,提到过go-restful模式的API安装都用到了APIGroupInfo,一般都是先生成一个APIGroupInfo实例,在该实例中,把我们关心的资源类型存好,然后调用GenericAPIServer.InstallAPIGroup完成API的Install操作。
// Info about an API group.
type APIGroupInfo struct {
GroupMeta apimachinery.GroupMeta
// Info about the resources in this group. Its a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
// OptionsExternalVersion controls the APIVersion used for common objects in the
// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
// If nil, defaults to groupMeta.GroupVersion.
// TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
OptionsExternalVersion *schema.GroupVersion
// MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
// common API implementations like ListOptions. Future changes will allow this to vary by group
// version (for when the inevitable meta/v2 group emerges).
MetaGroupVersion *schema.GroupVersion
// Scheme includes all of the types used by this group and how to convert between them (or
// to convert objects from outside of this group that are accepted in this API).
// TODO: replace with interfaces
Scheme *runtime.Scheme
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
}
GenericAPIServer.InstallAPIGroup
InstallAPIGroup负责把给定的API Group暴漏到API中,主要功能有两块:
- 调用installAPIResources实现资源API的暴漏
- API Group的的服务,为了/apis和/apis/<GroupName>两种服务,其中/apis通过把信息存放在DiscoveryGroupManager成员中,/apis/<GroupName>的通过构建APIGroupHandler,并存放到GoRestfulContainer中。
代码如下所示:
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
// Do not register empty group or empty version. Doing so claims /apis/ for the wrong entity to be returned.
// Catching these here places the error much closer to its origin
if len(apiGroupInfo.GroupMeta.GroupVersion.Group) == 0 {
return fmt.Errorf("cannot register handler with an empty group for %#v", *apiGroupInfo)
}
if len(apiGroupInfo.GroupMeta.GroupVersion.Version) == 0 {
return fmt.Errorf("cannot register handler with an empty version for %#v", *apiGroupInfo)
}
// 这里实现资源的API暴漏
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo); err != nil {
return err
}
// setup discovery
// Install the version handler.
// Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
// Check the config to make sure that we elide versions that don't have any resources
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
continue
}
apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
GroupVersion: groupVersion.String(),
Version: groupVersion.Version,
})
}
preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
GroupVersion: apiGroupInfo.GroupMeta.GroupVersion.String(),
Version: apiGroupInfo.GroupMeta.GroupVersion.Version,
}
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.GroupMeta.GroupVersion.Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}
// 把该API Group信息存储到DiscoveryGroupManager中,让后续暴漏的/apis中使用
s.DiscoveryGroupManager.AddGroup(apiGroup)
// 生成APIGrouphandler
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup, s.requestContextMapper).WebService())
return nil
}
GenericAPIServer.installAPIResources
installAPIResources是一个用于安装REST存储的私有方法,用来支撑各种api groupversionresource。该函数的代码逻辑非常简单,它循环扫描APIGroupInfo.GroupMeta.GroupVersions成员,生成相应的APIGroupVersion实例,并通过InstallREST方法把该GroupVersion的资源注册服务。
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
glog.Warningf("Skipping API %v because it has no resources.", groupVersion)
continue
}
// 这里生成APIGroupVersion实例,是REST API的关键
apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
if apiGroupInfo.OptionsExternalVersion != nil {
apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
}
if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
return fmt.Errorf("Unable to setup API %v: %v", apiGroupInfo, err)
}
}
return nil
}
getApiGropuVersion的代码如下,它通过组装了所有的资源对应的storage,并生成了APIGroupVersion实例,代码如下:
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
storage := make(map[string]rest.Storage)
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
storage[strings.ToLower(k)] = v
}
version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
version.Root = apiPrefix
version.Storage = storage
return version
}
APIGroupVersion是区分API版本的关键,以下是代码:
type APIGroupVersion struct {
Storage map[string]rest.Storage
Root string
// GroupVersion is the external group version
GroupVersion schema.GroupVersion
// OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
// empty, defaults to GroupVersion.
OptionsExternalVersion *schema.GroupVersion
// MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
// common API implementations like ListOptions. Future changes will allow this to vary by group
// version (for when the inevitable meta/v2 group emerges).
MetaGroupVersion *schema.GroupVersion
Mapper meta.RESTMapper
// Serializer is used to determine how to convert responses from API methods into bytes to send over
// the wire.
Serializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Defaulter runtime.ObjectDefaulter
Linker runtime.SelfLinker
UnsafeConvertor runtime.ObjectConvertor
Admit admission.Interface
Context request.RequestContextMapper
MinRequestTimeout time.Duration
// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool
}
- storage
前面在构建APIGroupVersion 实例的过程中,我们看到第一个成员Storage的身影,这个变量我们要分析清楚,它类型为map[string]rest.Storage。所以storage变量是一个Map,Key为Rest API的path ,Value为rest.Storage接口,此接口是一个通用的符合Restful要求的资源存储服务接口,每个服务接口负责处理一类(Kind)Kubernetes API中的数据对象-----资源你数据,只有一个接口方法:New(),New()方法返回该Storage服务所能识别和管理的某种具体的资源数据逇一个空实例。
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
}
在运行期间,Kubernetes API Runtime运行时框架会把New()方法返回的空对象的指针传入Codec.DecodeInto([]byte, runtime.Object)方法中,从而完成HTTP Rest请求中的Byte数组反序列化逻辑。Kubernetes API Server中所有对外提供服务的Restful资源都实现了此接口,这些资源包括pods、bindings、podTemplates、replicationControllers、services等,三个服务都有自己的列表,其中CRD和AA我们已经在前面的讲解中把代码贴出来了。而Master这块其实有两块资源安装:最新的资源以及传统资源模式,后面单独介绍这块。
APIGroupVersion是与rest.Storage Map绑定的,并且绑定了相应版本的Codec、Convertor用于版本转换,这样就很容易理解Kubernetes是怎么区分多版本API的Rest服务的。
- InstallREST
在APIGroupVersion的InstallREST(constainer *restful.Container)方法里,用Version变量来构造一个Rest API Path的前缀并赋值给APIInstall的prefix变量,并调用他的Install()方法完成Rest API的转换,代码如下:
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
enableAPIResponseCompression: g.EnableAPIResponseCompression,
}
apiResources, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}, g.Context)
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}
- APIInstall.Install
接着,在APIInstaller的Install()方法里用prefix(API版本)前缀生成WebService的相对根路径:
func (a *APIInstaller) newWebService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(a.prefix)
// a.prefix contains "prefix/group/version"
ws.Doc("API at " + a.prefix)
// Backwards compatibility, we accepted objects with empty content-type at V1.
// If we stop using go-restful, we can default empty content-type to application/json on an
// endpoint by endpoint basis
ws.Consumes("*/*")
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
ws.Produces(append(mediaTypes, streamMediaTypes...)...)
ws.ApiVersion(a.group.GroupVersion.String())
return ws
}
- 如何实现多版本支持呢?
以前面我们列举过的k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go中的NewRESTStorage方法中,可以看到,根据是否启用了v1beta1与v1版本的API,我们生成了不同的APIGroupInfo,如果同时启用了v1beta1与v1版本的话,最终调用APIGroupVersion的InstallREST方法,从而完成了最终的多版本API的Rest服务装配流程。
APIInstaller
前面我们多次讲到map[string]rest.Storage,从Master(传统模式与现代模式)、CRD、AA三个服务InstallGroupInfo中,都最终生成了这样一个Map实例。
这个Map的Key是Rest API的访问路径,Value却不是之前说好的restful.Route。所以必须存在一个“转换适配”的方法来实现上述转换!转化你的方法在pkg/apiserver/api_install.go的下属方法里:
func (a *APIInstaller) registerResoruceHandlers(path string, storage rest.Storage, ws *restful.WebService, proxyHandler http.Handler)
上述方法把一个path对应的rest.Storage转换成一系列的restful.Route并添加到指针restful.WebService中。这个函数的代码之所以很长,是因为有各种情况要考虑,比如pods/portforward这种路径要处理child,还要判断美中Storage资源类型锁支持的操作类型:比如是否支持create、delete、update及是否支持list、watch、pathcer等,对各种情况都考虑以后,这个函数的代码量已经超过500行!于是在外面封装了一个简单函数:func (a *APIInstlal)Install,内部循环调用registerResourceHandlers,返回最终的restful.WebService对象,次方法的主要代码如下:
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var errors []error
ws := a.newWebService()
proxyHandler := (&handlers.ProxyHandler{
Prefix: a.prefix + "/proxy/",
Storage: a.group.Storage,
Serializer: a.group.Serializer,
Mapper: a.group.Context,
})
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws, proxyHandler)
if err != nil {
errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
}
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
}
return apiResources, ws, errors
}
Install()方法循环调用了registerResourceHandlers函数,该函数实现了rest.Storage到restful.Route的转换,由于该函数代码比较长,这里只列举相关的片段:
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
deleter, isDeleter := storage.(rest.Deleter)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
_, isRedirector := storage.(rest.Redirector)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
前面我们提到rest.Storage接口只有一个New方法,一般的资源对象存储,出了实现rest.Storage接口之外,还实现多种REST操作接口,如上所示,具体各种资源数据对象的存储对象见ETCD存储分析。这段代码对storage对象进行判断,以确定并标记它锁满足的API Rest接口类型,而接下来的这段代码在此基础上确定此接口所包含的actions,后者则对应到某种HTTP请求方法(GET/POST/PUT/DELETE)或者HTTP PROXY、WATCH、CONNECT等动作;
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
// We add "proxy" subresource to remove the need for the generic top level prefix proxy.
// The generic top level prefix proxy is deprecated in v1.2, and will be removed in 1.3, or 1.4 at the latest.
// TODO: DEPRECATED in v1.2.
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer, false}, isRedirector)
// TODO: DEPRECATED in v1.2.
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer, false}, isRedirector)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
我们注意到rest.Redirector类型的storage被当作PROXY进行处理,由apiserver.ProxyHandler进行拦截,并调用rest.Redirector的ResourceLocation方法获取资源的处理路径(可能包含一个非空的http.RoundTripper),用于处理执行Redirector返回的URL请求)。Kubernetes API Server中PROXY请求存在的意义在于透明地访问某个其他节点(比如某个Minion)上的API。
最后,我们来分析下registerResourcesHandles中完成从rest.Storage到restful.Route映射的最后一段关键代码。下面是rest.Getter接口的Storage映射代码:
case "GET": // Get a resource.
var handler restful.RouteFunction
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, hasSubresource)
} else {
handler = restfulGetResource(getter, exporter, reqScope)
}
if needOverride {
// need change the reported verb
handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler)
} else {
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler)
}
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
}
doc := "read the specified " + kind
if hasSubresource {
doc = "read " + subresource + " of the specified " + kind
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
if isGetterWithOptions {
if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, err
}
}
if isExporter {
if err := addObjectParams(ws, route, versionedExportOptions); err != nil {
return nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)
上述代码首先通过函数restfulGetResourceWithOptions或者restfulGetResource创建了一个restful.RouteFunction,然后生成一个restful.route对象,最后注册到 restful.WebService中,从而完成了rest.Storage到Rest服务的“最后一公里”通车。restfulGetResource函数的定义如下:
func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.GetResource(r, e, scope)(res.ResponseWriter, req.Request)
}
}
handlers.GetResource的定义如下:
func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx request.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// check for export
options := metav1.GetOptions{}
if values := req.URL.Query(); len(values) > 0 {
exports := metav1.ExportOptions{}
if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &exports); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
if exports.Export {
if e == nil {
return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource))
}
return e.Export(ctx, name, exports)
}
if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
}
if trace != nil {
trace.Step("About to Get from storage")
}
// 这里调用了资源对象存储的Get方法,从而返回具体的资源对象
return r.Get(ctx, name, &options)
})
}
最终,我们看到了API的服务最终通过调用r.Get(ctx, name, &options)方法,从而得以返回某个资源对象。
在上面的查询操作中,没有权限控制,但是查看一下createHandler方法,就能看到权限控制的身影,如下所示的代码片段:
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, userInfo)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
err = mutatingAdmission.Admit(admissionAttributes)
if err != nil {
scope.err(err, w, req)
return
}
}
而对于资源的Create、Update、Delete、Connect、Patch等操作都有类似的权限控制,从Admit的参数admission.Attributes的属性来看,第三方系统可以开发细粒度的权限控制插件,针对任意资源的任意属性进行细粒度的权限控制,因为资源对象本身都传递到参数中了。
- 资源数据对象的序列化与版本化
这里也列举了createHandler中的代码片段,如下所示:
gv := scope.Kind.GroupVersion()
// 得到合适的SerializerInfo
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
if err != nil {
scope.err(err, w, req)
return
}
// 找到合适的decoder
decoder := scope.Serializer.DecoderToVersion(s.Serializer, schema.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal})
body, err := readBody(req)
if err != nil {
scope.err(err, w, req)
return
}
defaultGVK := scope.Kind
original := r.New()
trace.Step("About to convert to expected version")
// 采用decoder解码
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
if err != nil {
err = transformDecodeError(typer, err, original, gvk, body)
scope.err(err, w, req)
return
}
网友评论