美文网首页
管理系统 celery采集器的设计

管理系统 celery采集器的设计

作者: python菜鸟 | 来源:发表于2019-02-17 12:52 被阅读7次
    image.png

    设计原因

    1.由于客户存在多个商业级运维系统,每个运维系统都完全分开,需要每天登入各个运维系统查看是否存在告警。

    2.商业级的运维系统毕竟是通用化,无法完美匹配客户的业务需求,需要自己对商业的运维系统做扩展。

    3.商业级的运维系统有时候存在一些bug,对于少数设备无法进行监控和采集。

    采集信息详情

    • 采集存储数据
      • 主机wwn,主机组,LUN组,LUN,端口组,端口等等信息
    • 采集光交数据
      • 光交硬件信息,光交wwn信息,光交端口错误,性能信息
    • 采集vmware虚拟化数据
      • 采集vmware主机,宿主机,存储信息
    • 采集snmp数据
      • 待定
    • 接收 snmp trap数据
      • 通过trap服务接口,采集以上设备的告警信息,并写入告警系统
    • 分布式存储采集
      • 采集分布式存储的容量信息,卷组,LUN等等信息
    • 其他采集
      • 直接通过商业运维系统的数据库进行采集,采集需要的数据进行处理后进行存储。

    celery的安装及设置

    安装celery

    1.直接pip install celery进行安装。安装redis数据库,配置redis数据库密码。

    2.创建项目目录,在目录下创建 conf文件夹。在conf内创建conf.py文件,存储一些配置信息

    3.创建celery_run.py

    4.创建其他目录,每个目录对应一套业务系统。

    from __future__ import absolute_import,unicode_literals
    from celery import Celery,platforms
    from celery.schedules import crontab
    from conf.conf import logger,redis_ip,redis_port,redis_pass,redis_broker,redis_backend
    import sys
    from os import path
    #允许root用户运行
    platforms.C_FORCE_ROOT = True
    
    
    #设置最高40个任务同时运行
    CELERYD_MAX_TASKS_PER_CHILD = 40
    
    
    #设置本地时间
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = False
    
    #采用redis进行处理
    app = Celery(
        'manager',
        #发布任务
        broker='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_broker),
        #返回任务结果
        backend='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_backend),
        #在每个应用目录下创建tasks.py,并将其设置到下面
        include=['Brocade.tasks','fst.tasks','alert_system.tasks'])
    
    # celery配置存放
    app.conf.update(
        #返回任务结果存储1小时
        result_expires=3600,
        #设置序列模式
        task_serializer = 'pickle',
        # result_serializer = 'pickle',
        accept_content =['pickle'],
        # accept_content =['application/json', 'pickle'],
        #设置本地时间,启用定时任务时使用
        CELERY_TIMEZONE = 'Asia/Shanghai',
        CELERY_ENABLE_UTC = False,
    )
    
    # 定时任务设置
    app.conf.beat_schedule = {
        'multiply-at-some-time': {
            'task': 'alert_system.tasks.printtest',
            'schedule': crontab(hour=12, minute=00),   # 每天早上 6 点 00 分执行一次
            'args': ()                                  # 任务函数参数
        }
    
    }
    app.conf.timezone = "GMT"
    if __name__ == '__main__':
        try:
            app.start()
        except Exception as e:
            logger.warning("启动失败%s"%(e))
    
    

    创建完成后可以直接进行启动测试。celery -A celery_run worker -l info,启动成功正常。


    mongodb数据库的设置。

    在conf下创建mongodb.py,将数据库的操作全部放置到这里面,其他文件专注业务逻辑。需要先安装monogdb,pip install pymongo

    # -*- coding: utf-8 -*-
    from __future__ import absolute_import, unicode_literals
    from urllib.parse import quote_plus
    import sys,datetime
    
    
    from pymongo import MongoClient
    
    try:
        from collect_manager.conf.conf import mong_ip,mong_user,mong_password,mong_app
    except Exception:
        from .conf.conf import mong_ip, mong_user, mong_password, mong_app
    
    #接收log日志使用。
    class Logsystem():
        def __init__(self):
            self.obj = monogdb().use_logsystem()
        def warn(self,info):
            data = {
                "grade":"warn",
                "messages":info,
                "datetime":datetime.datetime.now(),
                "confirm": False,
            }
            self.obj.insert(data)
        def error(self,info):
            data = {
                "grade":"error",
                "messages":info,
                "datetime":datetime.datetime.now(),
                "confirm":False,
            }
            self.obj.insert(data)
        def info(self,info):
            data = {
                "grade":"info",
                "messages":info,
                "datetime":datetime.datetime.now(),
                "confirm":True,
            }
            self.obj.insert(data)
        def fail(self,info):
            data = {
                "grade":"fail",
                "messages":info,
                "datetime":datetime.datetime.now(),
                "confirm":False,
            }
            self.obj.insert(data)
            
            
    # 登入monogdb使用
    class monogdb():
        def __init__(self):
            """
            初始化登入系统
            """
            self.url = "mongodb://%s:%s@%s" % (quote_plus(mong_user),quote_plus(mong_password),mong_ip)
            self.client = MongoClient(self.url,connect=False)
            self.app = mong_app
        def Logindatabase(self):
            """
            返回登入得数据库
            :return:
            """
            dataname = self.client[self.app]
            return dataname
        def use_bro_san_tables(self):
            """
            使用bro_san表
            :return:
            """
            obj = self.Logindatabase()
            return obj["bro_san"]
        def use_soft_log_tables(self):
            obj = self.Logindatabase()
            return obj["soft_log"]
        def use_test_tables(self):
            obj = self.Logindatabase()
            return obj["test"]
        def use_virtual_machine_tables(self):
            obj = self.Logindatabase()
            return obj["virtual_machine"]
        def use_vmwarestorage_tables(self):
            obj = self.Logindatabase()
            return obj["vmware_storage"]
        def use_vmwarehost_tables(self):
            obj = self.Logindatabase()
            return obj["vmware_host"]
        def use_logsystem(self):
            """
                类型:
                messages
                qrade       错误类型,debug info warn  error fail
                is_confirm   是否确认
                is_delete   是否删除
                label       标注,确认是否为重复信息,采集mysql id信息。
                datetime  日期
            :return:
            """
            obj = self.Logindatabase()
            return obj["log_system"]
    
    
    

    snmp trap的接收服务设置

    负责接收trap的信息,由于 设备种类太多,没必要对MIB做解析,直接写入设备IP到日志系统内。

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    
    import sys
    
    
    sys.path.append(".")
    
    
    from pysnmp.entity import engine, config
    from pysnmp.proto import api
    
    from pyasn1.codec.ber import decoder
    from pysnmp.carrier.asynsock.dgram import udp, udp6
    
    from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
    
    from pysnmp.entity.rfc3413 import ntfrcv
    try:
        from monogdb_api import Logsystem
    except:
        from .monogdb_api import Logsystem
    
    Logsystem = Logsystem()
    
    snmpEngine = engine.SnmpEngine()
    
    def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
        while wholeMsg:
            msgVer = int(api.decodeMessageVersion(wholeMsg))
            if msgVer in api.protoModules:
                pMod = api.protoModules[msgVer]
            else:
                print('Unsupported SNMP version %s' % msgVer)
                return
            reqMsg, wholeMsg = decoder.decode(
                wholeMsg, asn1Spec=pMod.Message(),
                )
            print('Notification message from %s:%s: ' % (
                transportDomain, transportAddress
                )
            )
            ipdress = transportAddress[0]
            reqPDU = pMod.apiMessage.getPDU(reqMsg)
            if reqPDU.isSameTypeWith(pMod.TrapPDU()):
                if msgVer == api.protoVersion1:
                    try:
                        strlist = 'Enterprise: {},Agent Address: {},Generic Trap: {},Specific Trap: {},Uptime: {}'.format(
                            pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint(),
                            pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint(),
                            pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint(),
                            pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint(),
                            pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint()
                            )
                        print(strlist)
                    except Exception as e:
                        strlist = ("messages error,error({})").format(e)
                    varBinds = pMod.apiTrapPDU.getVarBindList(reqPDU)
                else:
                    strlist= "error messages is not"
                    varBinds = pMod.apiPDU.getVarBindList(reqPDU)
            Logsystem.error((u"ip ({}) is receive  trap error, Please check the device details immediately.\n {}").format(ipdress,strlist))
    
                # print('Var-binds:')
                # print(str(varBinds))
        #         for oid, val in varBinds:
        #             #a = oid.prettyPrint().strip()
        #             b = val.prettyPrint().strip().split('\n')
        #             #print(a)
        #             for line in b:
        #                 item = line.strip()
        #                 if item.startswith('string-value'):
        #                     print('string-value='+item.replace('string-value=0x','').decode('hex'))
        #                 else:
        #                     print(item)
        return wholeMsg
    
    
    if __name__ == '__main__':
        transportDispatcher = AsynsockDispatcher()
    
        transportDispatcher.registerRecvCbFun(cbFun)
    
        # UDP/IPv4
        transportDispatcher.registerTransport(
            udp.domainName, udp.UdpSocketTransport().openServerMode(('0.0.0.0', 162))
        )
    
        # UDP/IPv6
        transportDispatcher.registerTransport(
            udp6.domainName, udp6.Udp6SocketTransport().openServerMode(('::1', 162))
        )
    
        transportDispatcher.jobStarted(1)
    
        try:
            # Dispatcher will never finish as job#1 never reaches zero
            transportDispatcher.runDispatcher()
        except:
            transportDispatcher.closeDispatcher()
            raise
    
    

    存储数据的采集

    在目录下创建应用目录storage,并在下面创建对应的子目录,比如华为存储设置huawei,富士通存储设置fst.并且在storage下创建tasks.py。存储数据通过web主动点击的方式进行采集。

    设备品牌 采集方式 备注
    富士通 ssh 通过ssh进行数据采集
    华为 ssh 通过ssh进行数据采集
    日立 web 手工采集
    EMC web 手工采集

    这个代码相对简单,直接ssh到设备进行采集就行,对于一些低端的设备通过登入web进行采集。

    光交数据的采集

    每天采集一次,通过ssh的方式进行采集,采集后进行清洗存储。

    设备品牌 采集方式 备注
    博科 ssh 通过ssh进行数据采集
    思科 ssh 通过ssh进行数据采集

    vmware虚拟化数据的采集

    采集vmware的数据进行存储,对接客户的备份系统,明确查看虚拟化是否成功备份。

    # -*- coding: utf-8 -*-
    from __future__ import absolute_import, unicode_literals
    import atexit
    import requests
    from pyVmomi import vim
    from pyVim import connect
    import json
    from conf.conf import logger
    def sizeof_fmt(num):
        """
        Returns the human readable version of a file size
    
        :param num:
        :return:
        """
        for item in ['bytes', 'KB', 'MB', 'GB']:
            if num < 1024.0:
                return "%3.1f%s" % (num, item)
            num /= 1024.0
        return "%3.1f%s" % (num, 'TB')
    class vsphere:
        def __init__(self,host,user,password,port=443):
            """
            初始化实例,
            :param host:vc主机名
            :param user: vc用户
            :param password: vc密码
            :param port: vc端口,默认443
            """
            try:
                self.service_instance = connect.SmartConnectNoSSL(host=host,user = user,pwd = password,port = int(port))
                atexit.register(connect.Disconnect, self.service_instance)
                self.content = self.service_instance.RetrieveContent()
            except Exception as e:
                logger.error("to vsphere fail - %s"%(e))
        #v1.0
        def getvm(self):
            """
            {'bootTime': None,
            'Bios UUID': '423f330a-a578-2c6e-3fb1-dc514a38184d',
            'Annotation': '',
            'Name': 'GZGL_10.239.37.57',
            'VMware-tools': 'toolsOld',
            'Template': False,
            'memorySizeMB': 16384,
            'numdisk': 2,
            'Path': '[X10_K01_HITACHI_Cluster03_LUN15] GZGL_10.239.37.57/GZGL_10.239.37.57.vmx',
            'IP': 'toolsOld',
            'Instance UUID':
            '503fd0c6-1379-f1d0-c2ce-2a6ca446b34c',
            'Guest': 'Red Hat Enterprise Linux 6 (64-bit)',
            'State': 'poweredOn',
            'numCpu': 4}
            :return: list[dict]
            """
            container = self.content.rootFolder  # starting point to look into
            viewType = [vim.VirtualMachine]  # object types to look for
            recursive = True  # whether we should look into it recursively
            containerView = self.content.viewManager.CreateContainerView(
                container, viewType, recursive)
            children = containerView.view
            data = []
            for child in children:
                summary = child.summary
                try:
                    tempdata = {
                    "Name":summary.config.name,
                    "vm_tag":str(summary.vm),
                    "Template":summary.config.template,
                    "Path":summary.config.vmPathName,
                    "Guest":summary.config.guestFullName,
                    "Instance UUID":summary.config.instanceUuid,
                    "Bios UUID": summary.config.uuid,
                    "Annotation":summary.config.annotation,
                    "State":summary.runtime.powerState,
                    "VMware-tools":summary.guest.toolsStatus,
                    "IP":summary.guest.ipAddress,
                    "memorySizeMB":summary.config.memorySizeMB,
                    "numCpu":summary.config.numCpu,
                    "numdisk":summary.config.numVirtualDisks,
                    "bootTime":summary.runtime.bootTime,
                    "exsi_tag":str(summary.runtime.host),
                    "exsi_ip":summary.runtime.host.name,
                    "storage_committed": summary.storage.committed,
                    "storage_uncommitted":summary.storage.uncommitted,
                    "storage_unshared":summary.storage.unshared,
                    "quickStats_status":summary.quickStats.guestHeartbeatStatus,
                    "quickStats_uptimeSeconds":summary.quickStats.uptimeSeconds,
                    "quickStats_hostMemoryUsage":summary.quickStats.hostMemoryUsage,
                }
                except Exception as e:
                    tempdata = {
                        "Name": summary.config.name,
                        "vm_tag": str(summary.vm),
                        "Template": summary.config.template,
                        "Path": summary.config.vmPathName,
                        "Guest": summary.config.guestFullName,
                        "Instance UUID": summary.config.instanceUuid,
                        "Bios UUID": summary.config.uuid,
                        "Annotation": summary.config.annotation,
                        "State": summary.runtime.powerState,
                        "VMware-tools": summary.guest.toolsStatus,
                        "IP": summary.guest.ipAddress,
                        "memorySizeMB": summary.config.memorySizeMB,
                        "numCpu": summary.config.numCpu,
                        "numdisk": summary.config.numVirtualDisks,
                        "bootTime": summary.runtime.bootTime,
                        "exsi_tag": str(summary.runtime.host),
                        "exsi_ip": summary.runtime.host.name,
                        #"storage_committed": summary.storage.committed,
                        #"storage_uncommitted": summary.storage.uncommitted,
                        #"storage_unshared": summary.storage.unshared,
                        "quickStats_status": summary.quickStats.guestHeartbeatStatus,
                        "quickStats_uptimeSeconds": summary.quickStats.uptimeSeconds,
                        "quickStats_hostMemoryUsage": summary.quickStats.hostMemoryUsage,
                    }
                data.append(tempdata)
            return data
    
        def getvm_uuid(self):
            container = self.content.rootFolder  # starting point to look into
            viewType = [vim.VirtualMachine]  # object types to look for
            recursive = True  # whether we should look into it recursively
            containerView = self.content.viewManager.CreateContainerView(
                container, viewType, recursive)
            children = containerView.view
            data = []
            for child in children:
                summary = child.summary
                tempdata = {
                    "Instance UUID":summary.config.instanceUuid,
                }
                data.append(tempdata)
            return data
        #废弃
        def getexsihost_storage(self):
            """
            {
                ip:
                    磁盘名称。
                        {磁盘信息}
    
            }
            :return:dict
            """
            objview = self.content.viewManager.CreateContainerView(self.content.rootFolder,
                                                              [vim.HostSystem],
                                                              True)
            esxi_hosts = objview.view
            objview.Destroy()
            datastores = []
            for esxi_host in esxi_hosts:
                # All Filesystems on ESXi host
                # print(esxi_host)
                storage_system = esxi_host.configManager.storageSystem
                host_file_sys_vol_mount_info = storage_system.fileSystemVolumeInfo.mountInfo
                datastore_dict = {}
                # Map all filesystems
    
                for host_mount_info in host_file_sys_vol_mount_info:
                    # Extract only VMFS volumes
    
                    if host_mount_info.volume.type == "VMFS":
                        extents = host_mount_info.volume.extent
                        try:
                            uuid = host_mount_info.volume.uuid
                            uuids = uuid.replace("-","")
                        except Exception as e:
                            uuids = ""
                        datastore_details = {
                            'uuid': uuid,
                            'capacity': host_mount_info.volume.capacity,
                            'vmfs_version': host_mount_info.volume.version,
                            'local': host_mount_info.volume.local,
                            'ssd': host_mount_info.volume.ssd,
                        }
                        extent_arr = []
                        extent_count = 0
                        for extent in extents:
                            extent_count += 1
                            extent_arr.append(extent.diskName)
                            datastore_details['extents'] = extent_arr
                            datastore_details["storagename"] = host_mount_info.volume.name
                            datastore_details["exsi_host"] = esxi_host.name
    
                    datastores.append(datastore_details)
    
            return datastores
        def getstorage01(self):
            """
            "Capacity(GB)": "1.1TB",
              "URL": "ds:///vmfs/volumes/5b126d7e-fbee7582-8d66-5c546d5798c3/",
              "VmNum": "0",
              "hostNum": "1",
              "free cap(GB)": "1.1TB",
              "Name": "datastore1 (25)"
              完成
            :return:[{}]
            """
            ds_obj_list = self.content.viewManager.CreateContainerView(self.content.rootFolder,
                                                              [vim.Datastore],
                                                              True)
            datastore = []
            for ds in ds_obj_list.view:
                summary = ds.summary
                ds_capacity = summary.capacity
                ds_freespace = summary.freeSpace
                # ipdata = []
                # vmdata = []
                esxiip = []
                vmip = []
                for i in ds.host:
                    # ipdata.append({str(i.key):i.key.name})
                    esxiip.append({
                        "esxiip_tag":str(i.key),
                        "esxiip":i.key.name
                    })
                for i in ds.vm:
                    # vmdata.append({str(i):i.name})
                    vmip.append(
                        {
                            "vmip_tag":str(i),
                            "vmipname":i.name,
                        }
                    )
                datadict  = {
                    "Name":format(summary.name),
                    "URL":format(summary.url),
                    "Capacity":format(sizeof_fmt(ds_capacity)),
                    "free cap":format(sizeof_fmt(ds_freespace)),
                    "hostNum":format(len(ds.host)),
                    "VmNum":format(len(ds.vm)),
                    "esxiip":esxiip,
                    "vmip":vmip
                }
                if datadict:
                    datastore.append(datadict)
            return datastore
        #v1.0
        def gethost(self):
            """
            :return:list
            """
            objview = self.content.viewManager.CreateContainerView(self.content.rootFolder,
                                                              [vim.HostSystem],
                                                              True)
            esxi_hosts = objview.view
            objview.Destroy()
            datastores = []
            for esxi_host in esxi_hosts:
                systemInfo = esxi_host.hardware.systemInfo
                cpuInfo = esxi_host.hardware.cpuInfo
                storage_system = esxi_host.configManager.storageSystem
                host_file_sys_vol_mount_info = storage_system.fileSystemVolumeInfo.mountInfo
                data = {
                "esxi_host":esxi_host.name
                ,"esxi_tag":str(esxi_host)
                ,"vendor":systemInfo.vendor
                ,"model":systemInfo.model
                ,"host_uuid":systemInfo.uuid
                ,"numCpuPackages":cpuInfo.numCpuPackages
                ,"numCpuCores":cpuInfo.numCpuCores
                ,"numCpuThreads":cpuInfo.numCpuThreads
                ,"hz":cpuInfo.hz
                ,"memorySize":esxi_host.hardware.memorySize
                }
                datastores.append(data)
            return datastores
        #v1.0
        def getcluster(self):
            """
            获取集群下的IP信息
            :return:
            """
            objview = self.content.viewManager.CreateContainerView(self.content.rootFolder,
                                                              [vim.ClusterComputeResource],
                                                              True)
            datavcenter = {}
            for i in objview.view:
                datahost = []
                for k in i.host:
                    datahost.append({str(k):k.name})
                datavcenter[i.name] = datahost
            # print(json.dumps(datavcenter,indent=1))
            return datavcenter
    

    利用supervisor进行进程管理

    supervisor安装

    1.apt-get install supervisor直接进行安装 。
    2.echo_supervisord_conf > /etc/supervisord.conf,生成配置文件

    3.配置:

        key在配置文件底部添加
            [include]
    files=/etc/supervisor/*.conf #若你本地无/etc/supervisor目录,直接创建。
    

    4.开机自启动

    配置进程开机自启动“
        chmod +x /etc/init.d/supervisord
    # in debian based:
    sudo update-rc.d supervisord defaults
    # in redhat
    chkconfig --add supervisord
        设置开机启动:chkconfig supervisord on
        查看是否成功:chkconfig --list | grep supervisord
    
    1. 配置程序,在/etc/supervisor内创建3个conf配置文件。并写入以下3个文件。
    [program:sanmanager]
    directory=/home/collect_manager
    command= celery -A run_celery worker -l info
    stdout_logfile=/home/collect_manager/run.log
    user = root
    autostart=true
    autorestart=true
    startsecs=60
    stopasgroup=true
    ikillasgroup=true
    startretries=1
    redirect_stderr=true
    
    [program:sanmanagerbreat]
    directory=/home/collect_manager
    command= celery -A run_celery heart -l info
    stdout_logfile=/home/collect_manager/breat.log
    user = root
    autostart=true
    autorestart=true
    startsecs=60
    stopasgroup=true
    ikillasgroup=true
    startretries=1
    redirect_stderr=true
    
    [program:trapserver]
    directory=/home/collect_manager
    command= python3 trapserver.py
    stdout_logfile=/home/collect_manager/trapserver.log
    user = root
    autostart=true
    autorestart=true
    startsecs=60
    stopasgroup=true
    ikillasgroup=true
    startretries=1
    redirect_stderr=true
    
    

    6.用法

    supervisorctl的用法
    supervisord : 启动
    supervisor supervisorctl reload :修改完配置文件后重新启动
    supervisor supervisorctl status :查看supervisor监管的进程状态 
    supervisorctl start 进程名 :启动XXX进程 
    supervisorctl stop 进程名 :停止XXX进程
     supervisorctl stop all:停止全部进程,注:start、restart、stop都不会载入最新的配置文件。 
    supervisorctl update:根据最新的配置文件,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启
    
    

    相关文章

      网友评论

          本文标题:管理系统 celery采集器的设计

          本文链接:https://www.haomeiwen.com/subject/edrgeqtx.html