美文网首页
k8s自定义监控

k8s自定义监控

作者: 路破格 | 来源:发表于2020-06-23 11:44 被阅读0次

    需求

    镜像:centos:8
    应用:nginx
    软件:crontabs
    python库:requests

    service+endpoint监控

    1 nginx配置

    server {
            listen       80 default_server;
            server_name  localhost;
            rewrite ^/metrics$ /metrics/ break;
        
            location /metrics/ {
                root   /data/www/html;
                index  metrics.txt;
            }
        }
    

    2 python脚本

    vi /data/scripts/check_k8s_self.py

    import re
    import time
    import socket
    import requests
    import threading
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    def check_socket(ip = "", port = 80):
        for retry_count in range(3):
            try:
                cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                address = (ip, port)
                cs.settimeout(3)
                status = cs.connect_ex((address))
    
                if status == 0:
                    return True
                else:
                    return False
            except Exception as e:
                print(e)
                time.sleep(2)
    
        return False
    
    def http_get(request_url = ""):
        result = {
            'http': 0,
            'code': 0
        }
    
        try:
            req_rs = requests.get(
                url = request_url,
                timeout = 5
            )
            result['http'] = 1
            result['code'] = req_rs.status_code
            req_rs.close()
        except:
            pass
    
        return result
    
    def get_token():
        token_file = open(
            "/run/secrets/kubernetes.io/serviceaccount/token", 
            "r"
        )
        token = token_file.readline()
        token_file.close()
    
        return token
    
    def read_k8s_api(request_url = "/", request_token = ""):
        http_headers = {
            'Content-Type': 'application/json',
            'authorization': 'Bearer ' + request_token
        }
    
        rs = requests.get(
            url = "https://kubernetes.default.svc" + request_url,
            headers = http_headers,
            verify = False,
            timeout = 10
        )
    
        if rs.status_code == 200:
            return rs.json()
        else:
            return False
    
    def check_service(check_type = "", check_info = {}):
        check_result = ""
        check_item = '{%s="%s",namespace="%s",ip="%s",port="%d",protocol="%s"}' % (
            check_type,
            check_info[check_type],
            check_info['namespace'],
            check_info['ip'],
            check_info['port'],
            check_info['protocol']
        )
    
        if check_socket(check_info['ip'], check_info['port']):
            check_result += "kube_%s_selfcheck_socket%s 1\n" % (check_type, check_item)
            check_http = http_get("http://%s:%d" % (check_info['ip'], check_info['port']))
            check_result += "kube_%s_selfcheck_http%s %d\n" % (check_type, check_item, check_http['http'])
            check_result += "kube_%s_selfcheck_http_code%s %d\n" % (check_type, check_item, check_http['code'])
        else:
            check_result += "kube_%s_selfcheck_socket%s 0\n" % (check_type, check_item)
    
        global lock_thread, metrics
    
        lock_thread.acquire()
        metrics += check_result
        lock_thread.release()
    
    def get_k8s_service():
        global re_ip, token
    
        services = read_k8s_api(
            request_url = "/api/v1/services", 
            request_token = token
        )
    
        for service in services['items']:
            ip = service['spec']['clusterIP']
    
            if ip and re_ip.match(ip):
                ports = service['spec']['ports']
    
                for port in ports:
                    check_info = {
                        'service': service['metadata']['name'],
                        'namespace': service['metadata']['namespace'],
                        'ip': ip,
                        'port': port['port'],
                        'protocol': port['protocol']
                    }
        
                    threading.Thread(
                        target = check_service, 
                        args = ('service', check_info,)
                    ).start()
    
    def get_k8s_endpoint():
        global re_ip, token
    
        endpoints = read_k8s_api(
            request_url = "/api/v1/endpoints",
            request_token = token
        )
    
        for endpoint in endpoints['items']:
            if 'subsets' not in endpoint:
                continue
    
            for subnet in endpoint['subsets']:
                if 'addresses' not in subnet or 'ports' not in subnet:
                    continue
    
                for address in subnet['addresses']:
                    ip = address['ip']
    
                    if ip and re_ip.match(ip):
                        ports = subnet['ports']
            
                        for port in ports:
                            check_info = {
                                'endpoint': endpoint['metadata']['name'],
                                'namespace': endpoint['metadata']['namespace'],
                                'ip': ip,
                                'port': port['port'],
                                'protocol': port['protocol']
                            }
                
                            threading.Thread(
                                target = check_service, 
                                args = ("endpoint", check_info,)
                            ).start()
    
    if __name__ == "__main__":
        global lock_thread, metrics, re_ip, token
        
        lock_thread = threading.Lock()
        metrics = ""
        re_ip = re.compile("[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}")
        token = get_token()
    
        get_k8s_service()
        get_k8s_endpoint()
    
        while len(threading.enumerate()) > 1:
            time.sleep(1)
    
        f = open("/data/www/html/metrics/metrics.txt", "w")
        f.write(metrics)
        f.close()
    

    3 crontab

    echo '* * * * * root /usr/bin/python3 /data/scripts/check_k8s_self.py > /dev/null 2>&1' >> /etc/crontab

    4 自启动脚本

    echo 'nginx
    /usr/sbin/crond -n' >> /etc/rc.d/rc.local

    5 打包镜像

    将1-4步骤生成的容器通过docker commit生成镜像并推送到私有仓库

    6 部署到k8s

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      labels:
        name: self-monitor
      name: self-monitor
      namespace: kube-system
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: self-monitor
          usage: monitoring
      strategy:
        type: RollingUpdate
        rollingUpdate:
            maxSurge: 1
            maxUnavailable: 1
      template:
        metadata:
          labels:
            app: self-monitor
            usage: monitoring
          annotations:
            prometheus.io/port: "80"
            prometheus.io/scrape: "true"
            promethues.io/path: "/metrics"
        spec:
          # 私有仓库密钥
          imagePullSecrets:
            - name: ops-harbor
          containers:
          # 镜像路径
          - image: xxx/self-monitor:1
            imagePullPolicy: IfNotPresent
            name: self-monitor
            securityContext:
              privileged: true
            command:
              - "bash"
              - "-c"
              - "/etc/rc.d/rc.local"
            ports:
            - containerPort: 80
              protocol: TCP
            resources:
              limits:
                cpu: "1"
                memory: 1000Mi
              requests:
                cpu: 200m
                memory: 200Mi
          # 需要有cluster_role的只读权限,可以直接使用prometheus的sa账号权限
          serviceAccountName: prometheus-ecs-sa
    

    相关文章

      网友评论

          本文标题:k8s自定义监控

          本文链接:https://www.haomeiwen.com/subject/dngcfktx.html