import datetime
import yaml
import time, arrow
from kubernetes import client, utils
from kubernetes.stream import stream
class MyK8s:
def __init__(self, host, verify_ssl=True, ssl_ca_cert=None, cert_file=None, key_file=None, token=None):
"""
参考 https://github.com/kubernetes-client/python/blob/release-24.0/examples/remote_cluster.py
"""
aConfiguration = client.Configuration()
aConfiguration.host = host
aConfiguration.verify_ssl = verify_ssl
if not token:
aConfiguration.ssl_ca_cert = ssl_ca_cert
aConfiguration.cert_file = cert_file
aConfiguration.key_file = key_file
else:
aConfiguration.api_key = {"authorization": "Bearer " + token}
aApiClient = client.ApiClient(aConfiguration)
self.corev1 = client.CoreV1Api(aApiClient)
self.appv1 = client.AppsV1Api(aApiClient)
self.netv1 = client.NetworkingV1Api(aApiClient)
self.co_api = client.CustomObjectsApi(aApiClient)
def _client_api(self, kind):
if kind in ('controller_revision', 'daemon_set', 'deployment', 'replica_set', 'stateful_set'):
return self.appv1
elif kind in ('pod', 'service'):
return self.corev1
elif kind == 'ingress':
return self.netv1
elif kind == "rollout":
return self.co_api
else:
raise CCK8sError('不支持操作类型为{}的对象'.format(kind))
def restart_deploy(self, namespace, name):
deploy = self.get_namespaced_object("deployment", namespace, name)
deploy.spec.template.metadata.annotations["kubectl.kubernetes.io/restartedAt"] = arrow.now().format(arrow.FORMAT_ATOM)
return cck8s.patch_namespaced_object("deployment", namespace, name, deploy)
def get_deployment_by_label(self, namespace, selector):
return self._client_api("deployment").list_namespaced_deployment(namespace, label_selector=",".join(selector)).items
def patch_namespaced_object(self, kind, namespace, name, obj):
func = getattr(self._client_api(kind), 'patch_namespaced_{}'.format(kind))
return func(name, namespace, obj)
def get_namespaced_object_v2(self, kind, namespace, name):
selector = 'metadata.name={}'.format(name)
func = getattr(self._client_api(kind), 'list_namespaced_{}'.format(kind))
items = func(namespace, field_selector=selector).items
return items
def delete_namespaced_object(self, kind, namespace, name):
func = getattr(self._client_api(kind), 'delete_namespaced_{}'.format(kind))
func(name, namespace)
def get_rollout(self, namespace, name, **kw):
func = getattr(self._client_api("rollout"), 'get_namespaced_custom_object')
return func("rollouts.kruise.io", "v1alpha1", namespace, "rollouts", name, **kw)
def patch_rollout(self, namespace, name, object, **kw):
func = getattr(self._client_api("rollout"), 'patch_namespaced_custom_object')
return func("rollouts.kruise.io", "v1alpha1", namespace, "rollouts", name, **kw)
def create_rollout(self, namespace, json_data, **kw):
func = getattr(self._client_api("rollout"), 'create_namespaced_custom_object')
return func("rollouts.kruise.io", "v1alpha1", namespace, "rollouts", json_data, **kw)
Ref: CustomObjectsApi
网友评论