美文网首页
Kubesphere 源码分析2 ks-apiserver

Kubesphere 源码分析2 ks-apiserver

作者: 梅_梅 | 来源:发表于2021-07-08 00:02 被阅读0次

    1. 整体结构

    如下图所示
    ks-apiserver 的主要功能是聚合整个系统的业务功能对外提供同一的API入口,如下图所示ks-apiserver聚合的功能对象主要包含以下几类

    1. kubernetes原生的对象,由ks-apiserver连接api-server,直接获取更改etcd中kubernetes的原始数据(origin data)即可,操作的对象即kubernetes原生的configmap. deployment等对象。
    2. ks-controller-manager 封装的对象,ks-controller-manager的封装功能逻辑以crd对象的方式表现在etcd中,ks-apiserver通过连接k8s-apiserver操作etcd中的crd数据(crd data)即可,操作 ks-controller-manager 扩展的逻辑功能。
    3. 第三方的operator对象,如prometheus-operator等第三方完成的模块以operator的方式运行在系统中,其功能对应的对象也以crd的形式存放载etcd中,ks-apiserver也是通过和k8s-apiserver交互操作对应的crd完成。
    4. 普通的服务对象,如kenkins,sonarqube等以普通服务的方式运行在系统中,ks-apiserver直接通过网络调用和此类对象交互

    以上,ks-apiserver就完成了和各个内部对象的交互,即内部API(inner API aggregate)。ks-apiserver在对这些各个模块的功能进行整合,对外提供统一的API,即外部API(out API aggregate)

    1.png

    2. 代码分析

    根据整体结构分析的总结,下面主要从ks-apiserver建立各个 业务资源管理句柄,到对外的 接口封装,接口注册,和接口的权限审计等功能的注入进行分析.

    2.1 资源管理句柄创建

    以下代码主要是建立各个资源管理句柄。

    1. 源码中review1 建立了对k8s-apiserver的连接,通过informer管理了以下资源
    • kubernetes的资源(deployment, configmap 等kubernetes原生资源)
    • kubesphere的资源(kube-controller-manager定义的业务crd资源)
    • 第三方operator资源(prometheus, istio 等第三方模块定义的业务crd资源)
    1. 源码中review2中建立了对部分服务的远程调用连接,如对prometheus的数据查询,和对Devops等套件的管理。交互方式为http等普通网络调用。
    # "cmd/ks-apiserver/app/options/options.go" 234 lines --49%-
    // NewAPIServer creates an APIServer instance using given options
    func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) {
            apiServer := &apiserver.APIServer{
                    Config: s.Config,
            }   
         
            // review 1. 建立和kubernetes的连接通过informer管理以crd呈现的资源以及为kuberentes原生资源做直接代理
            kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
            if err != nil {
                    return nil, err 
            }   
            apiServer.KubernetesClient = kubernetesClient
                                                                                                                                                                                                                
            informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),
                    kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
            apiServer.InformerFactory = informerFactory
            
           // review 2. 通过http等公共协议远程调用管理部分通过服务接口直接提供的资源    
            if s.MonitoringOptions == nil || len(s.MonitoringOptions.Endpoint) == 0 { 
                    return nil, fmt.Errorf("moinitoring service address in configuration MUST not be empty, please check configmap/kubesphere-config in kubesphere-system namespace")
            } else {
                    monitoringClient, err := prometheus.NewPrometheus(s.MonitoringOptions)
                    if err != nil {
                            return nil, fmt.Errorf("failed to connect to prometheus, please check prometheus status, error: %v", err)
                    }   
                    apiServer.MonitoringClient = monitoringClient
            }   
     
            apiServer.MetricsClient = metricsserver.NewMetricsClient(kubernetesClient.Kubernetes(), s.KubernetesOptions)
     
           ...
           ...
    
            if s.DevopsOptions.Host != "" {
                    devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions)
                    if err != nil {
                            return nil, fmt.Errorf("failed to connect to jenkins, please check jenkins status, error: %v", err)
                    }
                    apiServer.DevopsClient = devopsClient
            }
            
            if s.SonarQubeOptions.Host != "" {
                    sonarClient, err := sonarqube.NewSonarQubeClient(s.SonarQubeOptions)
                    if err != nil {
                            return nil, fmt.Errorf("failed to connecto to sonarqube, please check sonarqube status, error: %v", err)
                    }
                    apiServer.SonarClient = sonarqube.NewSonar(sonarClient.SonarQube())
            }
            
            var cacheClient cache.Interface
            if s.RedisOptions != nil && len(s.RedisOptions.Host) != 0 {
                    if s.RedisOptions.Host == fakeInterface && s.DebugMode {
                            apiServer.CacheClient = cache.NewSimpleCache()
                    } else {
                            cacheClient, err = cache.NewRedisClient(s.RedisOptions, stopCh)
                            if err != nil {
                                    return nil, fmt.Errorf("failed to connect to redis service, please check redis status, error: %v", err)
                            }
                            apiServer.CacheClient = cacheClient
                    }
            } else {
                    klog.Warning("ks-apiserver starts without redis provided, it will use in memory cache. " +
                            "This may cause inconsistencies when running ks-apiserver with multiple replicas.")
                    apiServer.CacheClient = cache.NewSimpleCache()
            }  
    
    

    2.2 接口封装

    以下为工作空间资源的操作代码,函数DeleteWorkspace是对go-resetful框架提供的回调接口,即web调用接口,而其具体操作的对象则是通过函数DeleteWorkspace中ksclient即2.1中生成的kubesphere资源句柄进行操作。
    接口封装整体采用该流程

    func (h *tenantHandler) DeleteWorkspace(request *restful.Request, response *restful.Response) {
            workspace := request.PathParameter("workspace")
                                 
            opts := metav1.DeleteOptions{}
                                 
            err := request.ReadEntity(&opts)
            if err != nil {      
                    opts = *metav1.NewDeleteOptions(0)
            }                    
                                 
            err = h.tenant.DeleteWorkspace(workspace, opts)                                                                                                                                                     
                                 
            if err != nil {      
                    klog.Error(err)
                    if errors.IsNotFound(err) {
                            api.HandleNotFound(response, request, err)
                            return
                    }            
                    api.HandleBadRequest(response, request, err)
                    return       
            }                    
                                 
            response.WriteEntity(servererr.None)
    }       
    
    func (t *tenantOperator) DeleteWorkspace(workspace string, opts metav1.DeleteOptions) error {                                                                                                               
                              
            if opts.PropagationPolicy != nil && *opts.PropagationPolicy == metav1.DeletePropagationOrphan {
                    wsp, err := t.DescribeWorkspace(workspace)
                    if err != nil {
                            klog.Error(err)
                            return err
                    }         
                    wsp.Finalizers = append(wsp.Finalizers, orphanFinalizer)
                    _, err = t.ksclient.TenantV1alpha2().WorkspaceTemplates().Update(context.Background(), wsp, metav1.UpdateOptions{})
                    if err != nil {
                            klog.Error(err)
                            return err
                    }         
            }                 
            return t.ksclient.TenantV1alpha2().WorkspaceTemplates().Delete(context.Background(), workspace, opts)
    }
    

    2.3 接口注册

    最外层各类接口在下面代码统一注册,而具体的接口注册逻辑是在每个AddToContainer中, 而每个AddToContainer所属的对象,即是代表某一类接口集合。

    // pkg/kapis/metering/v1alpha1/register.go" 417 lines --19%--
    
    // Install all kubesphere api groups
    // Installation happens before all informers start to cache objects, so
    //   any attempt to list objects using listers will get empty results.
    func (s *APIServer) installKubeSphereAPIs() {
            imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
                    user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
                            s.InformerFactory.KubernetesSharedInformerFactory()),
                    loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
                    s.Config.AuthenticationOptions)
            amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
                    s.KubernetesClient.Kubernetes(),
                    s.InformerFactory)
            rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
                         
            urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
            urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
            urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.  OpenPitrixOptions))
            urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.     MeteringOptions, nil))
            urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
            urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
            urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
            urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
                    s.KubernetesClient.Master()))
            urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
                    s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
            urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config()))
            urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
                    s.InformerFactory.KubernetesSharedInformerFactory(),
                    s.InformerFactory.KubeSphereSharedInformerFactory(),
                    s.Config.MultiClusterOptions.ProxyPublishService,
                    s.Config.MultiClusterOptions.ProxyPublishAddress,
                    s.Config.MultiClusterOptions.AgentImage))
            urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
                    group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
                    rbacAuthorizer))
    
    ...
    }
    

    关注其中属于包v1alpha1 的AddToContainer代码,如下对集群以及节点的接口注册入了go-resetful 中。
    然后启动web服务,即可提供API服务.

    package v1alpha1
    
    func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache,          meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options) error {
            ws := runtime.NewWebService(GroupVersion)
                                                                                                                                                                                                                
            h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, opOptions)
                              
            ws.Route(ws.GET("/cluster").
                    To(h.HandleClusterMeterQuery).
                    Doc("Get cluster-level meter data.").
                    Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
                    Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both     cluster CPU usage and disk usage: `meter_cluster_cpu_usage|meter_cluster_memory_usage`.").DataType("string").Required(false)).
                    Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").        DataType("string").Required(false)).
                    Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").            DataType("string").Required(false)).
                    Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The   format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
                    Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are         mutually exclusive.").DataType("string").Required(false)).
                    Metadata(restfulspec.KeyOpenAPITags, []string{constants.ClusterMetersTag}).
                    Writes(model.Metrics{}).
                    Returns(http.StatusOK, respOK, model.Metrics{})).
                    Produces(restful.MIME_JSON)
            ws.Route(ws.GET("/nodes").
                    To(h.HandleNodeMeterQuery).
                    Doc("Get node-level meter data of all nodes.").
                    Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
                    Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both     node CPU usage and disk usage: `meter_node_cpu_usage|meter_node_memory_usage`.").DataType("string").Required(false)).
                    Param(ws.QueryParameter("resources_filter", "The node filter consists of a regexp pattern. It specifies which node data to return. For example, the following filter matches both node i-   caojnter and i-cmu82ogj: `i-caojnter|i-cmu82ogj`.").DataType("string").Required(false)).
                    Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)).
                    Param(ws.QueryParameter("pvc_filter", "The PVCs filter consists of a regexp pattern. It specifies which PVC data to return.").DataType("string").Required(false)).
                    Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").        DataType("string").Required(false)).
                    Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").            DataType("string").Required(false)).
                    Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The   format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
                    Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are         mutually exclusive.").DataType("string").Required(false)).
                    Param(ws.QueryParameter("sort_metric", "Sort nodes by the specified metric. Not applicable if **start** and **end** are provided.").DataType("string").Required(false)).
                    Param(ws.QueryParameter("sort_type", "Sort order. One of asc, desc.").DefaultValue("desc.").DataType("string").Required(false)).
                    Param(ws.QueryParameter("page", "The page number. This field paginates result data of each metric, then returns a specific page. For example, setting **page** to 2 returns the second      page. It only applies to sorted metric data.").DataType("integer").Required(false)).                
        ...
    }
    
    

    2.4 权限控制

    权限控制是在下面代码注入的,代码中view 1 所示,使用获取的资源句柄,通过im.NewOperator,以及am.NewOperator, rbac.NewRBACAuthorizer进行包装,生成imOperator, amOperator 两个带权限控制的封装句柄以及rbacAuthorizer对象。
    在接口注册时将权限对象注入回调接口,从而在接口中做权限控制。

    
    //   any attempt to list objects using listers will get empty results.
    func (s *APIServer) installKubeSphereAPIs() {  
           //view 1权限注入 
           imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
                    user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
                            s.InformerFactory.KubernetesSharedInformerFactory()),
                    loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
                    s.Config.AuthenticationOptions)          
            amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
                    s.KubernetesClient.Kubernetes(),         
                    s.InformerFactory)                       
            rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
                                                                               
            urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
            urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
            urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.  OpenPitrixOptions))                     
            urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.     MeteringOptions, nil))                  
            urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
            urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
            urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
            urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
                    s.KubernetesClient.Master()))            
            urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),                                                                                      
                    s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
            urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config()))
            urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
                    s.InformerFactory.KubernetesSharedInformerFactory(),
                    s.InformerFactory.KubeSphereSharedInformerFactory(),
                    s.Config.MultiClusterOptions.ProxyPublishService,
                    s.Config.MultiClusterOptions.ProxyPublishAddress,
                    s.Config.MultiClusterOptions.AgentImage))
            urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
                    group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()), 
                    rbacAuthorizer))  
    ...
    
    

    权限控制在接口中的具体使用可以参考命名空间的查询代码,其中t.authorizer.Authorize以及t.am.ListRoleBindings都是权限的获取,然后再根据权限判断是否阻断流程。

    func (t *tenantOperator) ListNamespaces(user user.Info, workspace string, queryParam *query.Query) (*api.ListResult, error) {
            nsScope := request.ClusterScope
            if workspace != "" {
                    nsScope = request.WorkspaceScope
                    // filter by workspace
                    queryParam.Filters[query.FieldLabel] = query.Value(fmt.Sprintf("%s=%s", tenantv1alpha1.WorkspaceLabel, workspace))
            }
            
            listNS := authorizer.AttributesRecord{
                    User:            user,
                    Verb:            "list",
                    Workspace:       workspace,
                    Resource:        "namespaces",
                    ResourceRequest: true,
                    ResourceScope:   nsScope,
            }
            
            decision, _, err := t.authorizer.Authorize(listNS)
            if err != nil {
                    klog.Error(err)
                    return nil, err
            }
            
            // allowed to list all namespaces in the specified scope
            if decision == authorizer.DecisionAllow {
                    result, err := t.resourceGetter.List("namespaces", "", queryParam)
                    if err != nil {
                            klog.Error(err)
                            return nil, err
                    }
                    return result, nil
            }
            
            // retrieving associated resources through role binding
            roleBindings, err := t.am.ListRoleBindings(user.GetName(), user.GetGroups(), "")
            if err != nil {                                                                                                                                                                                     
                    klog.Error(err)
                    return nil, err
            }
            
            namespaces := make([]runtime.Object, 0)
            for _, roleBinding := range roleBindings {
                    obj, err := t.resourceGetter.Get("namespaces", "", roleBinding.Namespace)
                    if err != nil {
                            klog.Error(err)
             ...
    }
    

    2.5 日志注入

    如下代码中view1为日志注入,使用go-resetful完成,每次调用时,将调用注册的回调logRequestAndResponse尝试输出日志。view2中则是注册参数检测回调,对请求检查一些必要的字段,过滤掉外部无关请求。

    func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
            s.container = restful.NewContainer()     
           //view1
            s.container.Filter(logRequestAndResponse)
            s.container.Router(restful.CurlyRouter{})
            s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
                    logStackOnRecover(panicReason, httpWriter)                                                                                                                                                  
            })                              
                                            
            s.installKubeSphereAPIs()       
                                            
            s.installMetricsAPI()           
            //view2
            s.container.Filter(monitorRequest)       
                                            
            for _, ws := range s.container.RegisteredWebServices() {
                    klog.V(2).Infof("%s", ws.RootPath())
            }                               
                                            
            s.Server.Handler = s.container  
                                            
            s.buildHandlerChain(stopCh)     
                                            
            return nil                      
    }        
    

    相关文章

      网友评论

          本文标题:Kubesphere 源码分析2 ks-apiserver

          本文链接:https://www.haomeiwen.com/subject/ubzoultx.html