美文网首页
kubernetes源码阅读——client-go 客户端

kubernetes源码阅读——client-go 客户端

作者: 睡不醒的大橘 | 来源:发表于2021-08-17 22:57 被阅读0次

    client-go

    • client-go是Kubernetes的Go语言的官方编程式交互客户端库,提供对Kubernetes API Server服务的交互访问。
    • client-go支持3种Client客户端:
    1. RESTClient是最基础的客户端。它对HTTP Request进行了封装。ClientSet、DynamicClient及DiscoveryClient客户端都是基于RESTClient实现的。
    2. ClientSet基于RESTClient,是kubernetes所有内置资源客户端的集合。每一个 Resource和Version都对应一个函数。ClientSet开发者对Kubernetes进行二次开发时最常使用的。
    3. DynamicClient是一种动态的 client,它能访问kubernetes 所有的资源,即内置资源和CRD自定义资源。

    clientset

    • clientset示例:
    // staging\src\k8s.io\client-go\examples\create-update-delete-deployment\main.go
    func main() {
        var kubeconfig *string
        if home := homedir.HomeDir(); home != "" {
            // kubeconfig默认存放在$HOME/.kube/config路径下
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
        }
        flag.Parse()
        
        // 读取kubeconfig配置信息并实例化rest.Config对象。配置信息包括集群、用户、命名空间和身份验证等,
        config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        if err != nil {
            panic(err)
        }
        // 通过kubeconfig配置信息实例化clientset对象
        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
            panic(err)
        }
        // 请求v1版本下的deployment资源。
        deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
    
        deployment := &appsv1.Deployment{
            ObjectMeta: metav1.ObjectMeta{
                Name: "demo-deployment",
            },
            Spec: appsv1.DeploymentSpec{
                Replicas: int32Ptr(2),
                Selector: &metav1.LabelSelector{
                    MatchLabels: map[string]string{
                        "app": "demo",
                    },
                },
                Template: apiv1.PodTemplateSpec{
                    ObjectMeta: metav1.ObjectMeta{
                        Labels: map[string]string{
                            "app": "demo",
                        },
                    },
                    Spec: apiv1.PodSpec{
                        Containers: []apiv1.Container{
                            {
                                Name:  "web",
                                Image: "nginx:1.12",
                                Ports: []apiv1.ContainerPort{
                                    {
                                        Name:          "http",
                                        Protocol:      apiv1.ProtocolTCP,
                                        ContainerPort: 80,
                                    },
                                },
                            },
                        },
                    },
                },
            },
        }
    
        // Create Deployment
        fmt.Println("Creating deployment...")
        result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
        if err != nil {
            panic(err)
        }
        fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())
    
        // Update Deployment
        prompt()
        fmt.Println("Updating deployment...")
        //    You have two options to Update() this Deployment:
        //
        //    1. Modify the "deployment" variable and call: Update(deployment).
        //       This works like the "kubectl replace" command and it overwrites/loses changes
        //       made by other clients between you Create() and Update() the object.
        //    2. Modify the "result" returned by Get() and retry Update(result) until
        //       you no longer get a conflict error. This way, you can preserve changes made
        //       by other clients between Create() and Update(). This is implemented below
        //           using the retry utility package included with client-go. (RECOMMENDED)
        //
        // More Info:
        // https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
    
        retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
            // Retrieve the latest version of Deployment before attempting update
            // RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
            result, getErr := deploymentsClient.Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
            if getErr != nil {
                panic(fmt.Errorf("Failed to get latest version of Deployment: %v", getErr))
            }
    
            result.Spec.Replicas = int32Ptr(1)                           // reduce replica count
            result.Spec.Template.Spec.Containers[0].Image = "nginx:1.13" // change nginx version
            _, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{})
            return updateErr
        })
        if retryErr != nil {
            panic(fmt.Errorf("Update failed: %v", retryErr))
        }
        fmt.Println("Updated deployment...")
    
        // List Deployments
        prompt()
        fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
        list, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{})
        if err != nil {
            panic(err)
        }
        for _, d := range list.Items {
            fmt.Printf(" * %s (%d replicas)\n", d.Name, *d.Spec.Replicas)
        }
    
        // Delete Deployment
        prompt()
        fmt.Println("Deleting deployment...")
        deletePolicy := metav1.DeletePropagationForeground
        if err := deploymentsClient.Delete(context.TODO(), "demo-deployment", metav1.DeleteOptions{
            PropagationPolicy: &deletePolicy,
        }); err != nil {
            panic(err)
        }
        fmt.Println("Deleted deployment.")
    }
    
    • Deployments函数是一个资源接口对象,用于Deployment资源对象的管理,对包含Create、Update、Delete、Get、List、Watch、Patch等方法
    // staging\src\k8s.io\client-go\kubernetes\typed\apps\v1\deployment.go
    // DeploymentInterface has methods to work with Deployment resources.
    type DeploymentInterface interface {
        Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
        Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
        UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
        Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
        DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
        Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
        List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
        Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
        Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
        Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
        ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
        GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
        UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
    
        DeploymentExpansion
    }
    
    // deployments implements DeploymentInterface
    type deployments struct {
        client rest.Interface
        ns     string
    }
    
    // Create takes the representation of a deployment and creates it.  Returns the server's representation of the deployment, and an error, if there is any.
    func (c *deployments) Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (result *v1.Deployment, err error) {
        result = &v1.Deployment{}
        err = c.client.Post().
            Namespace(c.ns).
            Resource("deployments").
            VersionedParams(&opts, scheme.ParameterCodec).
            Body(deployment).
            Do(ctx).
            Into(result)
        return
    }
    

    dynamicClient

    • ClientSet需要预先实现每种Resource和Version的操作,其内部的数据都是结构化数据。而DynamicClient的内部数据是通过map[string]interface{}转换的非结构化数据,因此DynamicClient能够处理CRD自定义资源。
    • dynamicClient示例:
    // staging\src\k8s.io\client-go\examples\dynamic-create-update-delete-deployment\main.go
    func main() {
        var kubeconfig *string
        if home := homedir.HomeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
        }
        flag.Parse()
    
        namespace := "default"
    
        config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        if err != nil {
            panic(err)
        }
        // 通过kubeconfig配置信息实例化dynamicClient对象
        client, err := dynamic.NewForConfig(config)
        if err != nil {
            panic(err)
        }
    
        deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
    
        deployment := &unstructured.Unstructured{
            Object: map[string]interface{}{
                "apiVersion": "apps/v1",
                "kind":       "Deployment",
                "metadata": map[string]interface{}{
                    "name": "demo-deployment",
                },
                "spec": map[string]interface{}{
                    "replicas": 2,
                    "selector": map[string]interface{}{
                        "matchLabels": map[string]interface{}{
                            "app": "demo",
                        },
                    },
                    "template": map[string]interface{}{
                        "metadata": map[string]interface{}{
                            "labels": map[string]interface{}{
                                "app": "demo",
                            },
                        },
    
                        "spec": map[string]interface{}{
                            "containers": []map[string]interface{}{
                                {
                                    "name":  "web",
                                    "image": "nginx:1.12",
                                    "ports": []map[string]interface{}{
                                        {
                                            "name":          "http",
                                            "protocol":      "TCP",
                                            "containerPort": 80,
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        }
    
        // Create Deployment
        fmt.Println("Creating deployment...")
        result, err := client.Resource(deploymentRes).Namespace(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
        if err != nil {
            panic(err)
        }
        fmt.Printf("Created deployment %q.\n", result.GetName())
    
        // Update Deployment
        prompt()
        fmt.Println("Updating deployment...")
        //    You have two options to Update() this Deployment:
        //
        //    1. Modify the "deployment" variable and call: Update(deployment).
        //       This works like the "kubectl replace" command and it overwrites/loses changes
        //       made by other clients between you Create() and Update() the object.
        //    2. Modify the "result" returned by Get() and retry Update(result) until
        //       you no longer get a conflict error. This way, you can preserve changes made
        //       by other clients between Create() and Update(). This is implemented below
        //           using the retry utility package included with client-go. (RECOMMENDED)
        //
        // More Info:
        // https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
    
        retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
            // Retrieve the latest version of Deployment before attempting update
            // RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
            result, getErr := client.Resource(deploymentRes).Namespace(namespace).Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
            if getErr != nil {
                panic(fmt.Errorf("failed to get latest version of Deployment: %v", getErr))
            }
    
            // update replicas to 1
            if err := unstructured.SetNestedField(result.Object, int64(1), "spec", "replicas"); err != nil {
                panic(fmt.Errorf("failed to set replica value: %v", err))
            }
    
            // extract spec containers
            containers, found, err := unstructured.NestedSlice(result.Object, "spec", "template", "spec", "containers")
            if err != nil || !found || containers == nil {
                panic(fmt.Errorf("deployment containers not found or error in spec: %v", err))
            }
    
            // update container[0] image
            if err := unstructured.SetNestedField(containers[0].(map[string]interface{}), "nginx:1.13", "image"); err != nil {
                panic(err)
            }
            if err := unstructured.SetNestedField(result.Object, containers, "spec", "template", "spec", "containers"); err != nil {
                panic(err)
            }
    
            _, updateErr := client.Resource(deploymentRes).Namespace(namespace).Update(context.TODO(), result, metav1.UpdateOptions{})
            return updateErr
        })
        if retryErr != nil {
            panic(fmt.Errorf("update failed: %v", retryErr))
        }
        fmt.Println("Updated deployment...")
    
        // List Deployments
        prompt()
        fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
        list, err := client.Resource(deploymentRes).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
        if err != nil {
            panic(err)
        }
        for _, d := range list.Items {
            replicas, found, err := unstructured.NestedInt64(d.Object, "spec", "replicas")
            if err != nil || !found {
                fmt.Printf("Replicas not found for deployment %s: error=%s", d.GetName(), err)
                continue
            }
            fmt.Printf(" * %s (%d replicas)\n", d.GetName(), replicas)
        }
    
        // Delete Deployment
        prompt()
        fmt.Println("Deleting deployment...")
        deletePolicy := metav1.DeletePropagationForeground
        deleteOptions := metav1.DeleteOptions{
            PropagationPolicy: &deletePolicy,
        }
        if err := client.Resource(deploymentRes).Namespace(namespace).Delete(context.TODO(), "demo-deployment", deleteOptions); err != nil {
            panic(err)
        }
    
        fmt.Println("Deleted deployment.")
    }
    
    func prompt() {
        fmt.Printf("-> Press Return key to continue.")
        scanner := bufio.NewScanner(os.Stdin)
        for scanner.Scan() {
            break
        }
        if err := scanner.Err(); err != nil {
            panic(err)
        }
        fmt.Println()
    }
    
    // staging\src\k8s.io\apimachinery\pkg\apis\meta\v1\unstructured\unstructured.go
    // Unstructured allows objects that do not have Golang structs registered to be manipulated
    // generically. This can be used to deal with the API objects from a plug-in. Unstructured
    // objects still have functioning TypeMeta features-- kind, version, etc.
    //
    // WARNING: This object has accessors for the v1 standard metadata. You *MUST NOT* use this
    // type if you are dealing with objects that are not in the server meta v1 schema.
    //
    // TODO: make the serialization part of this type distinct from the field accessors.
    // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
    // +k8s:deepcopy-gen=true
    type Unstructured struct {
        // Object is a JSON compatible map with string, float, int, bool, []interface{}, or
        // map[string]interface{}
        // children.
        Object map[string]interface{}
    }
    
    • dynamicResourceClient管理对Resource的Create、Update、Delete、Get、List、Watch、Patch等操作。
    // staging\src\k8s.io\client-go\dynamic\simple.go
    type dynamicClient struct {
        client *rest.RESTClient
    }
    
    var _ Interface = &dynamicClient{}
    
    type dynamicResourceClient struct {
        client    *dynamicClient
        namespace string
        resource  schema.GroupVersionResource
    }
    
    func (c *dynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
        return &dynamicResourceClient{client: c, resource: resource}
    }
    
    func (c *dynamicResourceClient) Namespace(ns string) ResourceInterface {
        ret := *c
        ret.namespace = ns
        return &ret
    }
    
    func (c *dynamicResourceClient) Create(ctx context.Context, obj *unstructured.Unstructured, opts metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) {
        outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
        if err != nil {
            return nil, err
        }
        name := ""
        if len(subresources) > 0 {
            accessor, err := meta.Accessor(obj)
            if err != nil {
                return nil, err
            }
            name = accessor.GetName()
            if len(name) == 0 {
                return nil, fmt.Errorf("name is required")
            }
        }
    
        result := c.client.client.
            Post().
            AbsPath(append(c.makeURLSegments(name), subresources...)...).
            Body(outBytes).
            SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
            Do(ctx)
        if err := result.Error(); err != nil {
            return nil, err
        }
    
        retBytes, err := result.Raw()
        if err != nil {
            return nil, err
        }
        uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
        if err != nil {
            return nil, err
        }
        return uncastObj.(*unstructured.Unstructured), nil
    }
    
    func (c *dynamicResourceClient) makeURLSegments(name string) []string {
        url := []string{}
        if len(c.resource.Group) == 0 {
            url = append(url, "api")
        } else {
            url = append(url, "apis", c.resource.Group)
        }
        url = append(url, c.resource.Version)
    
        if len(c.namespace) > 0 {
            url = append(url, "namespaces", c.namespace)
        }
        url = append(url, c.resource.Resource)
    
        if len(name) > 0 {
            url = append(url, name)
        }
    
        return url
    }
    

    相关文章

      网友评论

          本文标题:kubernetes源码阅读——client-go 客户端

          本文链接:https://www.haomeiwen.com/subject/ifdwbltx.html