美文网首页
k8s 源码的一些记录

k8s 源码的一些记录

作者: wwq2020 | 来源:发表于2020-07-19 12:30 被阅读0次

资源创建关键代码

旧的接口

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    apiGroupInfo := genericapiserver.APIGroupInfo{
        PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
        VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
        Scheme:                       legacyscheme.Scheme,
        ParameterCodec:               legacyscheme.ParameterCodec,
        NegotiatedSerializer:         legacyscheme.Codecs,
    }

    var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
    if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.IsVersionRegistered(policyGroupVersion) {
        var err error
        podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    }
    restStorage := LegacyRESTStorage{}

    podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    secretStorage, err := secretstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    configMapStorage, err := configmapstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    podStorage, err := podstore.NewStorage(
        restOptionsGetter,
        nodeStorage.KubeletConnectionInfo,
        c.ProxyTransport,
        podDisruptionClient,
    )
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    var serviceAccountStorage *serviceaccountstore.REST
    if c.ServiceAccountIssuer != nil && utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) {
        serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, secretStorage.Store, c.ExtendExpiration)
    } else {
        serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, 0, nil, nil, false)
    }
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    var serviceClusterIPRegistry rangeallocation.RangeRegistry
    serviceClusterIPRange := c.ServiceIPRange
    if serviceClusterIPRange.IP == nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing")
    }

    serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
        mem := allocator.NewAllocationMap(max, rangeSpec)
        // TODO etcdallocator package to return a storage interface via the storageFactory
        etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
        if err != nil {
            return nil, err
        }
        serviceClusterIPRegistry = etcd
        return etcd, nil
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
    }
    restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry

    // allocator for secondary service ip range
    var secondaryServiceClusterIPAllocator ipallocator.Interface
    if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
        var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
        secondaryServiceClusterIPAllocator, err = ipallocator.NewAllocatorCIDRRange(&c.SecondaryServiceIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
            mem := allocator.NewAllocationMap(max, rangeSpec)
            // TODO etcdallocator package to return a storage interface via the storageFactory
            etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
            if err != nil {
                return nil, err
            }
            secondaryServiceClusterIPRegistry = etcd
            return etcd, nil
        })
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
        }
        restStorage.SecondaryServiceClusterIPAllocator = secondaryServiceClusterIPRegistry
    }

    var serviceNodePortRegistry rangeallocation.RangeRegistry
    serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
        mem := allocator.NewAllocationMap(max, rangeSpec)
        // TODO etcdallocator package to return a storage interface via the storageFactory
        etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
        if err != nil {
            return nil, err
        }
        serviceNodePortRegistry = etcd
        return etcd, nil
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
    }
    restStorage.ServiceNodePortAllocator = serviceNodePortRegistry

    controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage,
        endpointsStorage,
        podStorage.Pod,
        serviceClusterIPAllocator,
        secondaryServiceClusterIPAllocator,
        serviceNodePortAllocator,
        c.ProxyTransport)

    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.LegacyBinding,

        "podTemplates": podTemplateStorage,

        "replicationControllers":        controllerStorage.Controller,
        "replicationControllers/status": controllerStorage.Status,

        "services":        serviceRest,
        "services/proxy":  serviceRestProxy,
        "services/status": serviceStatusStorage,

        "endpoints": endpointsStorage,

        "nodes":        nodeStorage.Node,
        "nodes/status": nodeStorage.Status,
        "nodes/proxy":  nodeStorage.Proxy,

        "events": eventStorage,

        "limitRanges":                   limitRangeStorage,
        "resourceQuotas":                resourceQuotaStorage,
        "resourceQuotas/status":         resourceQuotaStatusStorage,
        "namespaces":                    namespaceStorage,
        "namespaces/status":             namespaceStatusStorage,
        "namespaces/finalize":           namespaceFinalizeStorage,
        "secrets":                       secretStorage,
        "serviceAccounts":               serviceAccountStorage,
        "persistentVolumes":             persistentVolumeStorage,
        "persistentVolumes/status":      persistentVolumeStatusStorage,
        "persistentVolumeClaims":        persistentVolumeClaimStorage,
        "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
        "configMaps":                    configMapStorage,

        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
        restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
        restStorageMap["pods/eviction"] = podStorage.Eviction
    }
    if serviceAccountStorage.Token != nil {
        restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
    }
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        restStorageMap["pods/ephemeralcontainers"] = podStorage.EphemeralContainers
    }
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap

    return restStorage, apiGroupInfo, nil
}

新接口

pkg/master/master.go中

    restStorageProviders := []RESTStorageProvider{
        authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
        authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
        autoscalingrest.RESTStorageProvider{},
        batchrest.RESTStorageProvider{},
        certificatesrest.RESTStorageProvider{},
        coordinationrest.RESTStorageProvider{},
        discoveryrest.StorageProvider{},
        extensionsrest.RESTStorageProvider{},
        networkingrest.RESTStorageProvider{},
        noderest.RESTStorageProvider{},
        policyrest.RESTStorageProvider{},
        rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
        schedulingrest.RESTStorageProvider{},
        settingsrest.RESTStorageProvider{},
        storagerest.RESTStorageProvider{},
        flowcontrolrest.RESTStorageProvider{},
        // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
        // See https://github.com/kubernetes/kubernetes/issues/42392
        appsrest.StorageProvider{},
        admissionregistrationrest.RESTStorageProvider{},
        eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
    }

deployment,statefulset,daemonset,replicaset

pkg/registry/apps/rest/storage_apps.go中

func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
    storage := map[string]rest.Storage{}

    // deployments
    deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["deployments"] = deploymentStorage.Deployment
    storage["deployments/status"] = deploymentStorage.Status
    storage["deployments/scale"] = deploymentStorage.Scale

    // statefulsets
    statefulSetStorage, err := statefulsetstore.NewStorage(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["statefulsets"] = statefulSetStorage.StatefulSet
    storage["statefulsets/status"] = statefulSetStorage.Status
    storage["statefulsets/scale"] = statefulSetStorage.Scale

    // daemonsets
    daemonSetStorage, daemonSetStatusStorage, err := daemonsetstore.NewREST(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["daemonsets"] = daemonSetStorage
    storage["daemonsets/status"] = daemonSetStatusStorage

    // replicasets
    replicaSetStorage, err := replicasetstore.NewStorage(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["replicasets"] = replicaSetStorage.ReplicaSet
    storage["replicasets/status"] = replicaSetStorage.Status
    storage["replicasets/scale"] = replicaSetStorage.Scale

    // controllerrevisions
    historyStorage, err := controllerrevisionsstore.NewREST(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["controllerrevisions"] = historyStorage

    return storage, nil
}

batch

pkg/registry/batch/rest/storage_batch.go中

func (p RESTStorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
    storage := map[string]rest.Storage{}
    // jobs
    jobsStorage, jobsStatusStorage, err := jobstore.NewREST(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["jobs"] = jobsStorage
    storage["jobs/status"] = jobsStatusStorage

    return storage, err
}

func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
    storage := map[string]rest.Storage{}
    // cronjobs
    cronJobsStorage, cronJobsStatusStorage, err := cronjobstore.NewREST(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["cronjobs"] = cronJobsStorage
    storage["cronjobs/status"] = cronJobsStatusStorage

    return storage, err
}


hpa

func (p RESTStorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
    storage := map[string]rest.Storage{}
    // horizontalpodautoscalers
    hpaStorage, hpaStatusStorage, err := horizontalpodautoscalerstore.NewREST(restOptionsGetter)
    if err != nil {
        return storage, err
    }
    storage["horizontalpodautoscalers"] = hpaStorage
    storage["horizontalpodautoscalers/status"] = hpaStatusStorage

    return storage, err
}

相关文章

网友评论

      本文标题:k8s 源码的一些记录

      本文链接:https://www.haomeiwen.com/subject/xdvlkktx.html