美文网首页
storm capacity性能状态和kafka分区状态检查监控

storm capacity性能状态和kafka分区状态检查监控

作者: 平凡的运维之路 | 来源:发表于2023-04-24 15:41 被阅读0次

storm 性能状态检查监控

代码说明

  • 代码说明
#!/usr/bin/python3
from decimal import Decimal
# -*- coding:utf-8 -*-
import os,sys

#获取服务器版本信息,导入对应模块
VersionShell ="  cat /etc/redhat-release |awk -F'release ' '{print $2}'|cut -d. -f1 |  cut -b 1"
OSVersion = int([ Errorinfo for Errorinfo  in os.popen(VersionShell) ][0].split("\n")[0])
if OSVersion  == 5 or  OSVersion == 4 or   OSVersion == 6:
    import time, logging.config, ConfigParser, requests, os
    from decimal import Decimal
    reload(sys)
    sys.setdefaultencoding('utf8')
else:
    import time, logging.config, configparser, requests, os
    from decimal import Decimal


def get_component_info(storm_id):
    TimeStamp = int(time.time())
    user_headers = {
    'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
    'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
    'Accept-Encoding' : 'gzip,deflate',
    'Accept-Language' : 'zh-CN,zh;q=0.9'}

    #user_headers['Referer'] =  storm_ui_curl + "/topology.html?id=" + storm_id
    user_headers['Upgrade-Insecure-Requests'] = '1'
    cloudRouteRsyncURL = storm_ui_curl + "/api/v1/topology/" +  storm_id
    logger.info("Requests Api Url: " + cloudRouteRsyncURL)
    RetrunData=requests.get(cloudRouteRsyncURL, headers = user_headers)
    RetrunData_json= RetrunData.json()
    boltsMsg = RetrunData_json.get("bolts")
    for msg in boltsMsg:
        capacity = dict(msg).get("capacity")
        boltId = dict(msg).get("boltId")
        # logger.info(storm_id + " ===> " +  boltId  + " ===> " + str(capacity) )
        if  Decimal(capacity) >=  Decimal(Capacity_threshold):
            if "MongoBolt" in boltId:
                msg = storm_id.split("-")[0] +  "拓扑中的入库bolt," + boltId + "比Capacity性能指标设置阈值" +Capacity_threshold +"大,当前是:" +  capacity  +  ",请检查storm服务状态或mongodb入库延迟问题及缓存数据清理!!!"
                logger.error(msg)
            elif "ComputeBolt" in boltId:
                msg = storm_id.split("-")[0] +  "拓扑中的计算bolt," + boltId + "比Capacity性能指标设置阈值" +Capacity_threshold +"大,当前是:" +  capacity  +  ",请检查storm服务状态或mongodb入库延迟问题及缓存数据清理!!!"
                logger.error(msg)

#获取topologies id
def get_topology_id():
    id_list = []
    TimeStamp = int(time.time())
    user_headers = {
    'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
    'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
    'Accept-Encoding' : 'gzip,deflate',
    'Accept-Language' : 'zh-CN,zh;q=0.9'}
    user_headers['Upgrade-Insecure-Requests'] = '1'
    get_id_api = storm_ui_curl + "/api/v1/topology/summary"
    logger.info("Requests  Api Url: " + get_id_api + " get topology id ")
    try:
        RetrunRsyncData=requests.get(get_id_api, headers = user_headers,timeout=int(Timeout))
    except Exception as Error:
        logger.error("Calling interface Error Return: " + str(Error)  + "  exit!!!")
    try:
        result_msg= RetrunRsyncData.json()
        for line in result_msg.get("topologies"):
            storm_id = dict(line).get("id")
            id_list.append(storm_id)
        logger.info("get storm id list: "  + str(id_list))
    except Exception as ErrorMsg:
        logger.error("Calling interface Error Return: " + str(ErrorMsg)  + "  exit!!!")
    return id_list

#发送网管告警
def SendApex(ApexInfo):
    GetDomainMsg =  ApexInfo
    if "True"  == IsSendApex:
        os.system("/bin/logger -p local0.crit \"ccod: result=ERROR "+  GetDomainMsg  + " \"")
    else:
        logger.info("Not Send Rsyslog Info data to Apex ")

#超过三次不发送告警频次
def RepeatSendMsg(Msg):
    #判断是00:10 时,则情况前面的记录信息
    if time.localtime().tm_hour == 00 and time.localtime().tm_min  == 10 :
        os.popen("> " + WirteMsgfilename)
    Shell = " grep '" +  Msg + "' " +  WirteMsgfilename + " |wc -l"
    logger.info("Shell: " + Shell )
    RunShell = os.popen(Shell)
    ShellMsg = [msg.split("\n")[0] for msg in RunShell]
    Countnum = int(ShellMsg[0])
    if Countnum < 3:
        os.popen("echo  " + Msg  +  " >>" + WirteMsgfilename)
        return True
    else:
        return False

def Check_kafka_Leader_status():
    shell = ""
    shell_msg = ""
    if "True" == Is_open_sudo:
        # shell = "sudo -i -u  " + kafka_run_user + "  /bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port + " --topic " + topics + " |grep  Leader'"
        shell = "sudo -i -u  " + kafka_run_user + "  /bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port  + " |grep  Leader | grep -v offset'"
    else:
        # shell = "/bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port + " --topic " + topics + " |grep  Leader'"
        shell = "/bin/sh -c  ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper  " + zookeerp_ip_port + " |grep  Leader  | grep -v offset '"
    
    shell_msg = os.popen(shell)
    logger.info("run shell command info: " + shell)

    msg_list = [msg for msg in shell_msg ]
    logger.info("run shell msg return: " + str(msg_list) + " Partition num count: " + str(len(msg_list)))

    if len(msg_list) != 0:
        for Partition_info in msg_list:
            Topic = Partition_info.split("\t")[1].split()[1]
            Partition = Partition_info.split("\t")[2].split()[1]
            Leader = Partition_info.split("\t")[3].split()[1]
            if Leader  == "-1":
                alarm_msg = "kafka node Topic " + Topic  + " Partition " + Partition  + " Leader " + Leader + " No leader found for partition 0"
                logger.error(alarm_msg)
                SendApex(alarm_msg)
            else:
                alarm_msg = "kafka Node Topic " + Topic  + " Partition " + Partition  + " Leader " + Leader + " status ok"
                logger.info(alarm_msg)
    else:
        logger.error(topics +  " Not get Topic info " )

if __name__ == "__main__":
    for dirpath in os.popen("pwd"):
        dirpath = dirpath.strip('\n')
    cfgpath = os.path.join(dirpath, "cfg/config.ini")
    if OSVersion  == 5 or  OSVersion == 4 or   OSVersion == 6:
        conf = ConfigParser.ConfigParser()
    else:
        conf = configparser.ConfigParser()
    conf.read(cfgpath)
    # 基础配置加载
    #LogRunSentence = " [ ! -d  './log'  ] && mkdir -p  log"
    #os.system(LogRunSentence)
    logging.config.fileConfig("./cfg/logger.conf")
    logger = logging.getLogger("rotatfile")
    logger.setLevel(logging.INFO)
    storm_ui_curl = conf.get("main", "storm_ui_curl")
    Timeout = conf.get("main", "api_timeout")
    Capacity_threshold = conf.get("main", "Capacity_threshold")
    IsSendApex = conf.get("main", "IsSendApex")
    WirteMsgfilename = conf.get("main", "WirteMsgfilename")
    zookeerp_ip_port = conf.get("main", "zookeerp_ip_port")
    kafka_comm_path = conf.get("main", "kafka_comm_path")
    kafka_run_user = conf.get("main", "kafka_run_user")
    Is_open_sudo = conf.get("main", "Is_open_sudo")
    logger.info("Check Storm Capacity num Status ")
    topology_id_list = get_topology_id()
    # topology_id_list =  topics_list
    # topology_id_list = ["CallDetailTopology-6-1626660465"]
    if len(topology_id_list) != 0:
        for id in topology_id_list:
            get_component_info(id)

    logger.info("====================================")
    logger.info("Check kafka Partition Leader Status ")
    Check_kafka_Leader_status()

配置文件说明

  • 配置说明
[main]
#storm ui地址
storm_ui_curl = http://172.16.100.90:8080
#超时时间
api_timeout = 2
#storm检查对应性能指标数,该数值大于1.0则基本上入库就有延迟
Capacity_threshold  = 1.5
IsSendApex  = True
#storm性能检查重复记录文件
WirteMsgfilename = "./log/DuplicateInfo.txt"
#zookeerp地址和端口
zookeerp_ip_port =   192.168.128.1:2181
#kafka脚本路径
kafka_comm_path = /data/zks/kafka_2.11-0.8.2.2/bin/kafka-topics.sh
#是否开启sudo权限,不开启sudo,则部署在kafka安装的用户下
Is_open_sudo = True
#kafka运行用户sudo时,需要指定用户
kafka_run_user = root

部署说明

  • 部署方式

    http://xxxx.xxx.xxxx/Deploymentpackage/CheckStormStatus.tar.gz 通过链接公司vpn进行 #
    [ccodsupport@xxx ] cd apex
    [ccodsupport@xxx apex ]  tar xvf CheckStormStatus.tar.gz
    [ccodsupport@xxx  cfg]$ more config.ini 
    [main]
    #storm ui地址
    storm_ui_curl = http://192.168.127.2:8080
    #超时时间
    api_timeout = 2
    #storm检查对应性能指标数,该数值大于1.0则基本上入库就有延迟
    Capacity_threshold  = 1.5
    #Capacity 阈值说明
    #apacity:计算公式为Capacity = Spout 或者 Bolt 调用 execute 方法处理的消息数量 × 消息平均执行时间/时间区间。如果这个值越接近1,说明Spout或者Bolt基本一直在调用 execute 方法,因此并行度不够,需要扩展这个组件的 Executor数量
    #
    IsSendApex  = True
    #storm性能检查重复记录文件
    WirteMsgfilename = "./log/DuplicateInfo.txt"
    #zookeerp地址和端口
    zookeerp_ip_port =   192.168.127.2:2181
    #kafka脚本路径
    kafka_comm_path = /data/zks/kafka_2.11-0.8.2.2/bin/kafka-topics.sh
    #是否开启sudo权限
    Is_open_sudo = True
    #kafka运行用户sudo时,需要指定用户
    kafka_run_user = root
    
  • 修改好配置文件,启动运行,查看是否有异常。

[ccodsupport@TK_MDB_1 CheckStormStatus]$ cd  /home/ccodsupport/apex/CheckStormStatus && ./StormStatus
运行结果:
2023-03-16 15:09:29 139657615288064 StormStatus.py:151 INFO Check Storm Capacity num Status 
2023-03-16 15:09:29 139657615288064 StormStatus.py:57 INFO Requests  Api Url: http://192.168.127.2:8080/api/v1/topology/summary get topology id 
2023-03-16 15:09:29 139657615288064 StormStatus.py:67 INFO get storm id list: [u'IVRMESSAGETopology-2-1666188471', u'TSrvappraiseTopology-4-1666188482', u'RAGSETopology-3-1666188476', u'CallDetailTopology-1-1666188466', u'BxRecordTopology-5-1666188486', u'SrRecordTopology-7-1666188497', u'FastdfsUrlTopology-6-1666188492', u'SdCallResultTopology-8-1666188503']
2023-03-16 15:09:29 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/IVRMESSAGETopology-2-1666188471
2023-03-16 15:09:31 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/TSrvappraiseTopology-4-1666188482
2023-03-16 15:09:34 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/RAGSETopology-3-1666188476
2023-03-16 15:09:43 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/CallDetailTopology-1-1666188466
2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/BxRecordTopology-5-1666188486
2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/SrRecordTopology-7-1666188497
2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/FastdfsUrlTopology-6-1666188492
2023-03-16 15:09:45 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/SdCallResultTopology-8-1666188503
2023-03-16 15:09:45 139657615288064 StormStatus.py:159 INFO ====================================
2023-03-16 15:09:45 139657615288064 StormStatus.py:160 INFO Check kafka Partition Leader Status 
2023-03-16 15:09:45 139657615288064 StormStatus.py:107 INFO run shell command info: sudo -i -u  kafka  /bin/sh -c  ' source ~/.bash_profile ; /home/kafka/kafka_2.11-0.8.2.2/bin/kafka-topics.sh --describe --zookeeper  172.16.100.18:2181 |grep  Leader | grep -v offset'
2023-03-16 15:09:48 139657615288064 StormStatus.py:110 INFO run shell msg return: ['\tTopic: SmartDialerTest\tPartition: 0\tLeader: 1\tReplicas: 1\tIsr: 1\n', '\tTopic: agentProxy\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n', '\tTopic: agentStateDetail\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 0,1,2\n', '\tTopic: call_detail\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: chatLog\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 0,1,2\n', '\tTopic: ent_record_bx_table\tPartition: 0\tLeader: 1\tReplicas: 1,2,0\tIsr: 0,1,2\n', '\tTopic: ent_record_fastdfs_url\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: ent_record_sr_table\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: ivr_message\tPartition: 0\tLeader: 0\tReplicas: 0,1,2\tIsr: 0,1,2\n', '\tTopic: new_r_ags_e\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 2,0,1\n', '\tTopic: sd-monitor\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n', '\tTopic: sd2slee-instruct-ip\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: sd_call_result\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: session_detail\tPartition: 0\tLeader: 1\tReplicas: 1\tIsr: 1\n', '\tTopic: t_srvappraise\tPartition: 0\tLeader: 2\tReplicas: 2,1,0\tIsr: 0,1,2\n', '\tTopic: traffic_agent\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: universal\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n'] Partition num count: 17
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic SmartDialerTest Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic agentProxy Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic agentStateDetail Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic call_detail Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic chatLog Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_bx_table Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_fastdfs_url Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_sr_table Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ivr_message Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic new_r_ags_e Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd-monitor Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd2slee-instruct-ip Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd_call_result Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic session_detail Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic t_srvappraise Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic traffic_agent Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic universal Partition 0 Leader 2 status ok
  • 添加定时任务
0 */2 * * * cd /home/ccodsupport/apex/DetailedContrast && ./StormStatus &>/dev/null

有需要-请自行编译成二进制运行

相关文章

网友评论

      本文标题:storm capacity性能状态和kafka分区状态检查监控

      本文链接:https://www.haomeiwen.com/subject/rlbujdtx.html