美文网首页
storm capacity性能状态和kafka分区状态检查监控

storm capacity性能状态和kafka分区状态检查监控

作者: 平凡的运维之路 | 来源:发表于2023-04-24 15:41 被阅读0次

    storm 性能状态检查监控

    代码说明

    • 代码说明
    #!/usr/bin/python3
    from decimal import Decimal
    # -*- coding:utf-8 -*-
    import os,sys
    
    #获取服务器版本信息,导入对应模块
    VersionShell ="  cat /etc/redhat-release |awk -F'release ' '{print $2}'|cut -d. -f1 |  cut -b 1"
    OSVersion = int([ Errorinfo for Errorinfo  in os.popen(VersionShell) ][0].split("\n")[0])
    if OSVersion  == 5 or  OSVersion == 4 or   OSVersion == 6:
        import time, logging.config, ConfigParser, requests, os
        from decimal import Decimal
        reload(sys)
        sys.setdefaultencoding('utf8')
    else:
        import time, logging.config, configparser, requests, os
        from decimal import Decimal
    
    
    def get_component_info(storm_id):
        TimeStamp = int(time.time())
        user_headers = {
        'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
        'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
        'Accept-Encoding' : 'gzip,deflate',
        'Accept-Language' : 'zh-CN,zh;q=0.9'}
    
        #user_headers['Referer'] =  storm_ui_curl + "/topology.html?id=" + storm_id
        user_headers['Upgrade-Insecure-Requests'] = '1'
        cloudRouteRsyncURL = storm_ui_curl + "/api/v1/topology/" +  storm_id
        logger.info("Requests Api Url: " + cloudRouteRsyncURL)
        RetrunData=requests.get(cloudRouteRsyncURL, headers = user_headers)
        RetrunData_json= RetrunData.json()
        boltsMsg = RetrunData_json.get("bolts")
        for msg in boltsMsg:
            capacity = dict(msg).get("capacity")
            boltId = dict(msg).get("boltId")
            # logger.info(storm_id + " ===> " +  boltId  + " ===> " + str(capacity) )
            if  Decimal(capacity) >=  Decimal(Capacity_threshold):
                if "MongoBolt" in boltId:
                    msg = storm_id.split("-")[0] +  "拓扑中的入库bolt," + boltId + "比Capacity性能指标设置阈值" +Capacity_threshold +"大,当前是:" +  capacity  +  ",请检查storm服务状态或mongodb入库延迟问题及缓存数据清理!!!"
                    logger.error(msg)
                elif "ComputeBolt" in boltId:
                    msg = storm_id.split("-")[0] +  "拓扑中的计算bolt," + boltId + "比Capacity性能指标设置阈值" +Capacity_threshold +"大,当前是:" +  capacity  +  ",请检查storm服务状态或mongodb入库延迟问题及缓存数据清理!!!"
                    logger.error(msg)
    
    #获取topologies id
    def get_topology_id():
        id_list = []
        TimeStamp = int(time.time())
        user_headers = {
        'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
        'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
        'Accept-Encoding' : 'gzip,deflate',
        'Accept-Language' : 'zh-CN,zh;q=0.9'}
        user_headers['Upgrade-Insecure-Requests'] = '1'
        get_id_api = storm_ui_curl + "/api/v1/topology/summary"
        logger.info("Requests  Api Url: " + get_id_api + " get topology id ")
        try:
            RetrunRsyncData=requests.get(get_id_api, headers = user_headers,timeout=int(Timeout))
        except Exception as Error:
            logger.error("Calling interface Error Return: " + str(Error)  + "  exit!!!")
        try:
            result_msg= RetrunRsyncData.json()
            for line in result_msg.get("topologies"):
                storm_id = dict(line).get("id")
                id_list.append(storm_id)
            logger.info("get storm id list: "  + str(id_list))
        except Exception as ErrorMsg:
            logger.error("Calling interface Error Return: " + str(ErrorMsg)  + "  exit!!!")
        return id_list
    
    #发送网管告警
    def SendApex(ApexInfo):
        GetDomainMsg =  ApexInfo
        if "True"  == IsSendApex:
            os.system("/bin/logger -p local0.crit \"ccod: result=ERROR "+  GetDomainMsg  + " \"")
        else:
            logger.info("Not Send Rsyslog Info data to Apex ")
    
    #超过三次不发送告警频次
    def RepeatSendMsg(Msg):
        #判断是00:10 时,则情况前面的记录信息
        if time.localtime().tm_hour == 00 and time.localtime().tm_min  == 10 :
            os.popen("> " + WirteMsgfilename)
        Shell = " grep '" +  Msg + "' " +  WirteMsgfilename + " |wc -l"
        logger.info("Shell: " + Shell )
        RunShell = os.popen(Shell)
        ShellMsg = [msg.split("\n")[0] for msg in RunShell]
        Countnum = int(ShellMsg[0])
        if Countnum < 3:
            os.popen("echo  " + Msg  +  " >>" + WirteMsgfilename)
            return True
        else:
            return False
    
    def Check_kafka_Leader_status():
        shell = ""
        shell_msg = ""
        if "True" == Is_open_sudo:
            # shell = "sudo -i -u  " + kafka_run_user + "  /bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port + " --topic " + topics + " |grep  Leader'"
            shell = "sudo -i -u  " + kafka_run_user + "  /bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port  + " |grep  Leader | grep -v offset'"
        else:
            # shell = "/bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port + " --topic " + topics + " |grep  Leader'"
            shell = "/bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port + " |grep  Leader  | grep -v offset '"
        
        shell_msg = os.popen(shell)
        logger.info("run shell command info: " + shell)
    
        msg_list = [msg for msg in shell_msg ]
        logger.info("run shell msg return: " + str(msg_list) + " Partition num count: " + str(len(msg_list)))
    
        if len(msg_list) != 0:
            for Partition_info in msg_list:
                Topic = Partition_info.split("\t")[1].split()[1]
                Partition = Partition_info.split("\t")[2].split()[1]
                Leader = Partition_info.split("\t")[3].split()[1]
                if Leader  == "-1":
                    alarm_msg = "kafka node Topic " + Topic  + " Partition " + Partition  + " Leader " + Leader + " No leader found for partition 0"
                    logger.error(alarm_msg)
                    SendApex(alarm_msg)
                else:
                    alarm_msg = "kafka Node Topic " + Topic  + " Partition " + Partition  + " Leader " + Leader + " status ok"
                    logger.info(alarm_msg)
        else:
            logger.error(topics +  " Not get Topic info " )
    
    if __name__ == "__main__":
        for dirpath in os.popen("pwd"):
            dirpath = dirpath.strip('\n')
        cfgpath = os.path.join(dirpath, "cfg/config.ini")
        if OSVersion  == 5 or  OSVersion == 4 or   OSVersion == 6:
            conf = ConfigParser.ConfigParser()
        else:
            conf = configparser.ConfigParser()
        conf.read(cfgpath)
        # 基础配置加载
        #LogRunSentence = " [ ! -d  './log'  ] && mkdir -p  log"
        #os.system(LogRunSentence)
        logging.config.fileConfig("./cfg/logger.conf")
        logger = logging.getLogger("rotatfile")
        logger.setLevel(logging.INFO)
        storm_ui_curl = conf.get("main", "storm_ui_curl")
        Timeout = conf.get("main", "api_timeout")
        Capacity_threshold = conf.get("main", "Capacity_threshold")
        IsSendApex = conf.get("main", "IsSendApex")
        WirteMsgfilename = conf.get("main", "WirteMsgfilename")
        zookeerp_ip_port = conf.get("main", "zookeerp_ip_port")
        kafka_comm_path = conf.get("main", "kafka_comm_path")
        kafka_run_user = conf.get("main", "kafka_run_user")
        Is_open_sudo = conf.get("main", "Is_open_sudo")
        logger.info("Check Storm Capacity num Status ")
        topology_id_list = get_topology_id()
        # topology_id_list =  topics_list
        # topology_id_list = ["CallDetailTopology-6-1626660465"]
        if len(topology_id_list) != 0:
            for id in topology_id_list:
                get_component_info(id)
    
        logger.info("====================================")
        logger.info("Check kafka Partition Leader Status ")
        Check_kafka_Leader_status()
    

    配置文件说明

    • 配置说明
    [main]
    #storm ui地址
    storm_ui_curl = http://172.16.100.90:8080
    #超时时间
    api_timeout = 2
    #storm检查对应性能指标数,该数值大于1.0则基本上入库就有延迟
    Capacity_threshold  = 1.5
    IsSendApex  = True
    #storm性能检查重复记录文件
    WirteMsgfilename = "./log/DuplicateInfo.txt"
    #zookeerp地址和端口
    zookeerp_ip_port =   192.168.128.1:2181
    #kafka脚本路径
    kafka_comm_path = /data/zks/kafka_2.11-0.8.2.2/bin/kafka-topics.sh
    #是否开启sudo权限,不开启sudo,则部署在kafka安装的用户下
    Is_open_sudo = True
    #kafka运行用户sudo时,需要指定用户
    kafka_run_user = root
    

    部署说明

    • 部署方式

      http://xxxx.xxx.xxxx/Deploymentpackage/CheckStormStatus.tar.gz 通过链接公司vpn进行 #
      [ccodsupport@xxx ] cd apex
      [ccodsupport@xxx apex ]  tar xvf CheckStormStatus.tar.gz
      [ccodsupport@xxx  cfg]$ more config.ini 
      [main]
      #storm ui地址
      storm_ui_curl = http://192.168.127.2:8080
      #超时时间
      api_timeout = 2
      #storm检查对应性能指标数,该数值大于1.0则基本上入库就有延迟
      Capacity_threshold  = 1.5
      #Capacity 阈值说明
      #apacity:计算公式为Capacity = Spout 或者 Bolt 调用 execute 方法处理的消息数量 × 消息平均执行时间/时间区间。如果这个值越接近1,说明Spout或者Bolt基本一直在调用 execute 方法,因此并行度不够,需要扩展这个组件的 Executor数量
      #
      IsSendApex  = True
      #storm性能检查重复记录文件
      WirteMsgfilename = "./log/DuplicateInfo.txt"
      #zookeerp地址和端口
      zookeerp_ip_port =   192.168.127.2:2181
      #kafka脚本路径
      kafka_comm_path = /data/zks/kafka_2.11-0.8.2.2/bin/kafka-topics.sh
      #是否开启sudo权限
      Is_open_sudo = True
      #kafka运行用户sudo时,需要指定用户
      kafka_run_user = root
      
    • 修改好配置文件,启动运行,查看是否有异常。

    [ccodsupport@TK_MDB_1 CheckStormStatus]$ cd  /home/ccodsupport/apex/CheckStormStatus && ./StormStatus
    运行结果:
    2023-03-16 15:09:29 139657615288064 StormStatus.py:151 INFO Check Storm Capacity num Status 
    2023-03-16 15:09:29 139657615288064 StormStatus.py:57 INFO Requests  Api Url: http://192.168.127.2:8080/api/v1/topology/summary get topology id 
    2023-03-16 15:09:29 139657615288064 StormStatus.py:67 INFO get storm id list: [u'IVRMESSAGETopology-2-1666188471', u'TSrvappraiseTopology-4-1666188482', u'RAGSETopology-3-1666188476', u'CallDetailTopology-1-1666188466', u'BxRecordTopology-5-1666188486', u'SrRecordTopology-7-1666188497', u'FastdfsUrlTopology-6-1666188492', u'SdCallResultTopology-8-1666188503']
    2023-03-16 15:09:29 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/IVRMESSAGETopology-2-1666188471
    2023-03-16 15:09:31 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/TSrvappraiseTopology-4-1666188482
    2023-03-16 15:09:34 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/RAGSETopology-3-1666188476
    2023-03-16 15:09:43 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/CallDetailTopology-1-1666188466
    2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/BxRecordTopology-5-1666188486
    2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/SrRecordTopology-7-1666188497
    2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/FastdfsUrlTopology-6-1666188492
    2023-03-16 15:09:45 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/SdCallResultTopology-8-1666188503
    2023-03-16 15:09:45 139657615288064 StormStatus.py:159 INFO ====================================
    2023-03-16 15:09:45 139657615288064 StormStatus.py:160 INFO Check kafka Partition Leader Status 
    2023-03-16 15:09:45 139657615288064 StormStatus.py:107 INFO run shell command info: sudo -i -u  kafka  /bin/sh -c  ' source ~/.bash_profile ; /home/kafka/kafka_2.11-0.8.2.2/bin/kafka-topics.sh --describe --zookeeper  172.16.100.18:2181 |grep  Leader | grep -v offset'
    2023-03-16 15:09:48 139657615288064 StormStatus.py:110 INFO run shell msg return: ['\tTopic: SmartDialerTest\tPartition: 0\tLeader: 1\tReplicas: 1\tIsr: 1\n', '\tTopic: agentProxy\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n', '\tTopic: agentStateDetail\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 0,1,2\n', '\tTopic: call_detail\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: chatLog\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 0,1,2\n', '\tTopic: ent_record_bx_table\tPartition: 0\tLeader: 1\tReplicas: 1,2,0\tIsr: 0,1,2\n', '\tTopic: ent_record_fastdfs_url\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: ent_record_sr_table\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: ivr_message\tPartition: 0\tLeader: 0\tReplicas: 0,1,2\tIsr: 0,1,2\n', '\tTopic: new_r_ags_e\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 2,0,1\n', '\tTopic: sd-monitor\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n', '\tTopic: sd2slee-instruct-ip\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: sd_call_result\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: session_detail\tPartition: 0\tLeader: 1\tReplicas: 1\tIsr: 1\n', '\tTopic: t_srvappraise\tPartition: 0\tLeader: 2\tReplicas: 2,1,0\tIsr: 0,1,2\n', '\tTopic: traffic_agent\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: universal\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n'] Partition num count: 17
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic SmartDialerTest Partition 0 Leader 1 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic agentProxy Partition 0 Leader 2 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic agentStateDetail Partition 0 Leader 2 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic call_detail Partition 0 Leader 1 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic chatLog Partition 0 Leader 2 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_bx_table Partition 0 Leader 1 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_fastdfs_url Partition 0 Leader 1 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_sr_table Partition 0 Leader 1 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ivr_message Partition 0 Leader 0 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic new_r_ags_e Partition 0 Leader 2 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd-monitor Partition 0 Leader 2 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd2slee-instruct-ip Partition 0 Leader 0 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd_call_result Partition 0 Leader 0 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic session_detail Partition 0 Leader 1 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic t_srvappraise Partition 0 Leader 2 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic traffic_agent Partition 0 Leader 0 status ok
    2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic universal Partition 0 Leader 2 status ok
    
    • 添加定时任务
    0 */2 * * * cd /home/ccodsupport/apex/DetailedContrast && ./StormStatus &>/dev/null
    

    有需要-请自行编译成二进制运行

    相关文章

      网友评论

          本文标题:storm capacity性能状态和kafka分区状态检查监控

          本文链接:https://www.haomeiwen.com/subject/rlbujdtx.html