Clientset

作者: 程序员札记 | 来源:发表于2022-11-05 19:19 被阅读0次

1、介绍
Clientset 是调用 Kubernetes 资源对象最常用的客户端,可以操作所有的资源对象。
那么在 Clientset 中使如何用这些资源的呢?
因为在 staging/src/k8s.io/api 下面定义了各种类型资源的规范,然后将这些规范注册到了全局的 Scheme 中。这样就可以在Clientset中使用这些资源了。
2、示例
首先我们来看下如何通过 Clientset 来获取资源对象,我们这里来创建一个 Clientset 对象,然后通过该对象来获取默认命名空间之下的 Deployments 列表,代码如下所示:

package main

import (
    "flag"
    "fmt"
    "os"
    "path/filepath"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    var err error
    var config *rest.Config
    var kubeconfig *string

    if home := homeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    // 使用 ServiceAccount 创建集群配置(InCluster模式)
    if config, err = rest.InClusterConfig(); err != nil {
        // 使用 KubeConfig 文件创建集群配置
        if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
            panic(err.Error())
        }
    }

    // 创建 clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 使用 clientsent 获取 Deployments
    deployments, err := clientset.AppsV1().Deployments("default").List(metav1.ListOptions{})
    if err != nil {
        panic(err)
    }
    for idx, deploy := range deployments.Items {
        fmt.Printf("%d -> %s\n", idx+1, deploy.Name)
    }

}

func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    }
    return os.Getenv("USERPROFILE") // windows
}

这是一个非常典型的访问 Kubernetes 集群资源的方式,通过 client-go 提供的 Clientset 对象来获取资源数据,主要有以下三个步骤:

  • 使用 kubeconfig 文件或者 ServiceAccount(InCluster 模式)来创建访问 Kubernetes API 的 Restful 配置参数,也就是代码中的 rest.Config 对象
  • 使用 rest.Config 参数创建 Clientset 对象,这一步非常简单,直接调用 kubernetes.NewForConfig(config) 即可初始化
  • 然后是 Clientset 对象的方法去获取各个 Group 下面的对应资源对象进行 CRUD 操作
    3、Clientset 对象
    上面我们了解了如何使用 Clientset 对象来获取集群资源,接下来我们来分析下 Clientset 对象的实现。

上面我们使用的 Clientset 实际上是对各种资源类型的 Clientset 的一次封装:

// staging/src/k8s.io/client-go/kubernetes/clientset.go

// NewForConfig 通过给定的 config 创建一个新的 Clientset
func NewForConfig(c *rest.Config) (*Clientset, error) {
    configShallowCopy := *c

    if configShallowCopy.UserAgent == "" {
        configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
    }

    // share the transport between all clients
    httpClient, err := rest.HTTPClientFor(&configShallowCopy)
    if err != nil {
        return nil, err
    }

    return NewForConfigAndClient(&configShallowCopy, httpClient)
}


func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
    configShallowCopy := *c
    if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
        if configShallowCopy.Burst <= 0 {
            return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
        }
        configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
    }

    var cs Clientset
    var err error
  
        // 将其他 Group 和版本的资源的 RESTClient 封装到全局的 Clientset 对象中
    cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfigAndClient(&configShallowCopy, httpClient)
    if err != nil {
        return nil, err
    }
 
  cs.appsV1, err = appsv1.NewForConfigAndClient(&configShallowCopy, httpClient)
    if err != nil {
        return nil, err
    }
  ......
  return &cs, nil
}

上面的 NewForConfig 函数最后调用了NewForConfigAndClient函数,NewForConfigAndClient里面就是将其他的各种资源的 RESTClient 封装到了全局的 Clientset 中,这样当我们需要访问某个资源的时候只需要使用 Clientset 里面包装的属性即可,比如 clientset.CoreV1() 就是访问 Core 这个 Group 下面 v1 这个版本的 RESTClient。这些局部的 RESTClient 都定义在 staging/src/k8s.io/client-go/typed///_client.go 文件中,比如 staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go 这个文件中就是定义的 apps 这个 Group 下面的 v1 版本的 RESTClient,这里同样以 Deployment 为例:

// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go

func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
    config := *c
    if err := setConfigDefaults(&config); err != nil {
        return nil, err
    }
    httpClient, err := rest.HTTPClientFor(&config)
    if err != nil {
        return nil, err
    }
  
     // 最后还是通过调用 NewForConfigAndClient 函数,返回AppsV1这个资源对象的Clientset
    return NewForConfigAndClient(&config, httpClient)
}

func setConfigDefaults(config *rest.Config) error {
    // 资源对象的GroupVersion
    gv := v1.SchemeGroupVersion
    config.GroupVersion = &gv
    // 资源对象的根目录
    config.APIPath = "/apis"
    // 使用注册的资源类型 Scheme 对请求和响应进行编解码,Scheme 就是资源类型的规范
    config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

    if config.UserAgent == "" {
        config.UserAgent = rest.DefaultKubernetesUserAgent()
    }

    return nil
}

func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
    return newDeployments(c, namespace)
}


// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go

// deployments implements DeploymentInterface
// deployments 实现了 DeploymentInterface 接口
type deployments struct {
    client rest.Interface
    ns     string
}

// newDeployments returns a Deployments
// newDeployments 实例化 deployments 对象
func newDeployments(c *AppsV1Client, namespace string) *deployments {
    return &deployments{
        client: c.RESTClient(),
        ns:     namespace,
    }
}
通过上面代码我们就可以很清晰的知道可以通过 clientset.AppsV1().Deployments("default")来获取一个 deployments 对象,然后该对象下面定义了 deployments 对象的 CRUD 操作,比如我们调用的 List() 函数:

// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go

// List takes label and field selectors, and returns the list of Deployments that match those selectors.
func (c *deployments) List(ctx context.Context, opts metav1.ListOptions) (result *v1.DeploymentList, err error) {
    var timeout time.Duration
    if opts.TimeoutSeconds != nil {
        timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
    }
    result = &v1.DeploymentList{}
    err = c.client.Get().
        Namespace(c.ns).
        Resource("deployments").
        VersionedParams(&opts, scheme.ParameterCodec).
        Timeout(timeout).
        Do(ctx).
        Into(result)
    return
}

从上面代码可以看出最终是通过 c.client 去发起的请求,也就是局部的 restClient 初始化的函数中通过 rest.RESTClientFor(&config) 创建的对象,也就是将 rest.Config 对象转换成一个 Restful 的 Client 对象用于网络操作:

// staging/src/k8s.io/client-go/rest/config.go


// RESTClientFor返回一个满足客户端Config对象上所要求属性的RESTClient。
// 请注意,在初始化客户端时,一个RESTClient可能需要一些可选的字段。
// 由该方法创建的RESTClient是通用的
// 它期望在遵循Kubernetes惯例的API上操作,但可能不是Kubernetes的API。
// RESTClientFor等同于调用RESTClientForConfigAndClient(config, httpClient)
// 其中httpClient是用HTTPClientFor(config)生成的。
func RESTClientFor(config *Config) (*RESTClient, error) {
    if config.GroupVersion == nil {
        return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
    }
    if config.NegotiatedSerializer == nil {
        return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
    }

    // Validate config.Host before constructing the transport/client so we can fail fast.
    // ServerURL will be obtained later in RESTClientForConfigAndClient()
    _, _, err := defaultServerUrlFor(config)
    if err != nil {
        return nil, err
    }

    httpClient, err := HTTPClientFor(config)
    if err != nil {
        return nil, err
    }

    return RESTClientForConfigAndClient(config, httpClient)
}

到这里我们就知道了 Clientset 是基于 RESTClient 的,RESTClient 是底层的用于网络请求的对象,可以直接通过 RESTClient 提供的 RESTful 方法如 Get()、Put()、Post()、Delete() 等和 APIServer 进行交互。

同时支持JSON和protobuf两种序列化方式,支持所有原生资源。

当主食化RESTClient过后,就可以发起网络请求了,比如对于Deployment的List操作:

// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go

func (c *deployments) List(ctx context.Context, opts metav1.ListOptions) (result *v1.DeploymentList, err error) {
    var timeout time.Duration
    if opts.TimeoutSeconds != nil {
        timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
    }
    result = &v1.DeploymentList{}
    err = c.client.Get().
        Namespace(c.ns).
        Resource("deployments").
        VersionedParams(&opts, scheme.ParameterCodec).
        Timeout(timeout).
        Do(ctx).
        Into(result)
    return
}

上面通过调用 RestClient 发起网络请求,真正发起网络请求的代码如下所示:

// staging/src/k8s.io/client-go/rest/request.go

// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
    //Metrics for total request latency
    start := time.Now()
    defer func() {
        metrics.RequestLatency.Observe(ctx, r.verb, *r.URL(), time.Since(start))
    }()

    if r.err != nil {
        klog.V(4).Infof("Error in request: %v", r.err)
        return r.err
    }

    if err := r.requestPreflightCheck(); err != nil {
        return err
    }

    // 初始化网络客户端
    client := r.c.Client
    if client == nil {
        client = http.DefaultClient
    }

    // Throttle the first try before setting up the timeout configured on the
    // client. We don't want a throttled client to return timeouts to callers
    // before it makes a single request.
    if err := r.tryThrottle(ctx); err != nil {
        return err
    }

     // 超时处理
    if r.timeout > 0 {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, r.timeout)
        defer cancel()
    }

    isErrRetryableFunc := func(req *http.Request, err error) bool {
        // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
        // Thus in case of "GET" operations, we simply retry it.
        // We are not automatically retrying "write" operations, as they are not idempotent.
        if req.Method != "GET" {
            return false
        }
        // For connection errors and apiserver shutdown errors retry.
        if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
            return true
        }
        return false
    }

    // Right now we make about ten retry attempts if we get a Retry-After response.
    // 重试机制
    retry := r.retryFn(r.maxRetries)
    for {
        if err := retry.Before(ctx, r); err != nil {
            return retry.WrapPreviousError(err)
        }
    
         // 构造请求对象
        req, err := r.newHTTPRequest(ctx)
        if err != nil {
            return err
        }
    
            // 发起网络请求
        resp, err := client.Do(req)
        updateURLMetrics(ctx, r, resp, err)
        // The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
        // https://pkg.go.dev/net/http#Request
        if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
            metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
        }
        retry.After(ctx, r, resp, err)

        done := func() bool {
            defer readAndCloseResponseBody(resp)

            // if the the server returns an error in err, the response will be nil.
            f := func(req *http.Request, resp *http.Response) {
                if resp == nil {
                    return
                }
                fn(req, resp)
            }

            if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
                return false
            }

            f(req, resp)
            return true
        }()
        if done {
            return retry.WrapPreviousError(err)
        }
    }
}

到这里就完成了一次完整的网络请求。
其实 Clientset 对象也就是将 rest.Config 封装成了一个 http.Client 对象而已,最终还是利用 golang 中的 http 库来执行一个正常的网络请求而已。

相关文章

  • Clientset

    1、介绍Clientset 是调用 Kubernetes 资源对象最常用的客户端,可以操作所有的资源对象。那么在 ...

  • Client-go客户端源码解析--Controller总体流程

    Client-go目录结构 ClientSet客户端 ClientSet在RESTClient的基础上封装了对re...

  • code-generator

    代码生成器 介绍 client-go为每种k8s内置资源提供了对应的clientset和informer。那么我们...

网友评论

      本文标题:Clientset

      本文链接:https://www.haomeiwen.com/subject/ojyvtdtx.html