美文网首页
K8S Python API 调用kruise-rollout(

K8S Python API 调用kruise-rollout(

作者: 一只老梨花 | 来源:发表于2023-04-09 19:17 被阅读0次
    import datetime
    import yaml
    import time, arrow
    from kubernetes import client, utils
    from kubernetes.stream import stream
    
    
    class MyK8s:
        def __init__(self, host, verify_ssl=True, ssl_ca_cert=None, cert_file=None, key_file=None, token=None):
            """
            参考 https://github.com/kubernetes-client/python/blob/release-24.0/examples/remote_cluster.py
            """
            aConfiguration = client.Configuration()
            aConfiguration.host = host
            aConfiguration.verify_ssl = verify_ssl
    
            if not token:
                aConfiguration.ssl_ca_cert = ssl_ca_cert
                aConfiguration.cert_file = cert_file
                aConfiguration.key_file = key_file
            else:
                aConfiguration.api_key = {"authorization": "Bearer " + token}
            aApiClient = client.ApiClient(aConfiguration)
            self.corev1 = client.CoreV1Api(aApiClient)
            self.appv1 = client.AppsV1Api(aApiClient)
            self.netv1 = client.NetworkingV1Api(aApiClient)
            self.co_api = client.CustomObjectsApi(aApiClient)
    
        def _client_api(self, kind):
            if kind in ('controller_revision', 'daemon_set', 'deployment', 'replica_set', 'stateful_set'):
                return self.appv1
            elif kind in ('pod', 'service'):
                return self.corev1
            elif kind == 'ingress':
                return self.netv1
            elif kind == "rollout":
                return self.co_api
            else:
                raise CCK8sError('不支持操作类型为{}的对象'.format(kind))
    
    
        def restart_deploy(self, namespace, name):
            deploy = self.get_namespaced_object("deployment", namespace, name)
            deploy.spec.template.metadata.annotations["kubectl.kubernetes.io/restartedAt"] = arrow.now().format(arrow.FORMAT_ATOM)
            return cck8s.patch_namespaced_object("deployment", namespace, name, deploy)
    
        def get_deployment_by_label(self, namespace, selector):
            return self._client_api("deployment").list_namespaced_deployment(namespace, label_selector=",".join(selector)).items
    
        def patch_namespaced_object(self, kind, namespace, name, obj):
            func = getattr(self._client_api(kind), 'patch_namespaced_{}'.format(kind))
            return func(name, namespace, obj)
    
        def get_namespaced_object_v2(self, kind, namespace, name):
            selector = 'metadata.name={}'.format(name)
            func = getattr(self._client_api(kind), 'list_namespaced_{}'.format(kind))
            items = func(namespace, field_selector=selector).items
            return items
    
        def delete_namespaced_object(self, kind, namespace, name):
            func = getattr(self._client_api(kind), 'delete_namespaced_{}'.format(kind))
            func(name, namespace)
    
        def get_rollout(self, namespace, name, **kw):
            func = getattr(self._client_api("rollout"), 'get_namespaced_custom_object')
            return func("rollouts.kruise.io", "v1alpha1", namespace, "rollouts", name, **kw)
    
        def patch_rollout(self, namespace, name, object, **kw):
            func = getattr(self._client_api("rollout"), 'patch_namespaced_custom_object')
            return func("rollouts.kruise.io", "v1alpha1", namespace, "rollouts", name, **kw)
    
        def create_rollout(self, namespace, json_data, **kw):
            func = getattr(self._client_api("rollout"), 'create_namespaced_custom_object')
            return func("rollouts.kruise.io", "v1alpha1", namespace, "rollouts", json_data, **kw)
    
    

    Ref: CustomObjectsApi

    相关文章

      网友评论

          本文标题:K8S Python API 调用kruise-rollout(

          本文链接:https://www.haomeiwen.com/subject/hlobddtx.html