美文网首页
rabbitmq的队列消息累积监控总结

rabbitmq的队列消息累积监控总结

作者: 海的那一边 | 来源:发表于2019-05-20 18:12 被阅读0次

    目标:对rabbitmq的队列进行消息累积监控。
    整体思路:

    1. 根据rabbitmq提供的API,获取要监控的队列信息。通过http请求获取某个queue信息:http://192.168.xx.xx:xx/api/queues/queue_name(如果需要获取全部的队列信息则使用http://192.168.xx.xx:xx/api/queues/
      从返回中解析到对列名字及对列对应的未读消息数。
    2. 进行消息累积逻辑判断。首先判断获取的未读消息数是否超过累积阈值,再针对超过阈值的队列,判断其消息是否正在被消费(10秒内每一秒获取一次消费者消费的速度deliver_get_rate参数,如果一次也不大于零,则认为是当前没有消费者消费),最后将消息超过阈值且消息未正在消费的队列信息存入一个列表中,并计数。
    3. 判断累积消息的队列是否大于0,即如果有累积消息的队列,则发送邮件通知相关人员。
    4. 通过定时任务,持续监测rabbitmq队列消息。

    相关实现:

    1. 定时任务:每隔一段时间执行一次。具体实现:
      1). 引入模块:import schedule
      2). 执行:
    schedule.every(intervalTime).seconds.do(job_custom)
    
    while True:
    
     schedule.run_pending()
    

    在while True死循环中,schedule.run_pending()是保持schedule一直运行

    1. 配置文件。
      1). 配置文件名:monitorConfig.ini,文件内容例如:
    **[all_config]**
    
    # qa or online
    
    environment = online
    
    # all or custom
    
    all_or_custom = custom
    

    2). 读取配置文件:

    
    os.chdir("/Users/xxx/Downloads/qianyi/")
    
    # 生成一个实例
    
    cf = configparser.ConfigParser()
    
    filename = cf.read("monitorConfig.ini")
    
    # 获取环境配置,qa或者pub
    
    environment = str(cf.get("all_config", "environment"))
    
    # 获取部分队列未读消息还是所有队列未读消息
    
    all_or_custom = str(cf.get('all_config', 'all_or_custom'))
    
    
    1. 日志:引入模块import logging.handlers
    
    # 设置logging
    
    logger = logging.getLogger('rabbitMqLogger')
    
    logger.setLevel(logging.DEBUG)
    
    rf_handler = logging.handlers.TimedRotatingFileHandler(filename='rabbitMq.log', when='D', interval=1, backupCount=20)
    
    rf_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    
    # 在控制台打印日志
    
    handler = logging.StreamHandler()
    
    handler.setLevel(logging.DEBUG)
    
    handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
    
    logger.addHandler(rf_handler)
    
    logger.addHandler(handler)
    

    TimedRotatingFileHandler的构造函数定义如下:
    TimedRotatingFileHandler(filename [,when [,interval [,backupCount]]])
    filename 是输出日志文件名的前缀,比如log/myapp.log
    when 是一个字符串的定义如下:
    “S”: Seconds
    “M”: Minutes
    “H”: Hours
    “D”: Days
    “W”: Week day (0=Monday)
    “midnight”: Roll over at midnight
    interval 是指等待多少个单位when的时间后,Logger会自动重建文件,当然,这个文件的创建
    取决于filename+suffix,若这个文件跟之前的文件有重名,则会自动覆盖掉以前的文件,所以
    有些情况suffix要定义的不能因为when而重复。
    backupCount 是保留日志个数。默认的0是不会自动删除掉日志。若设3,则在文件的创建过程中
    库会判断是否有超过这个3,若超过,则会从最先创建的开始删除。

    1. 异常处理:使用try: except:进行异常处理,防止代码遇到异常停止运行。
    2. 多线程。如果请求的队列较多,每个队列都要进行未读消息数及消费数量的判断,加起来会比较耗时,考虑使用多线程,可以加快程序运行速度。
    def job_custom():
        list_all_que = []  # 所有消息数超处阈值的队列名及消息数量
        print '开始执行时间:',(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    
        class MyThread(threading.Thread):  # 继承父类threading.Thread
            def __init__(self, thread_id, name, que1):
                threading.Thread.__init__(self)
                self.thread_id = thread_id
                self.name = name
                self.que1 = que1
    
            def run(self):  # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
                print "Starting " + self.name
                test(self.que1)
                print "Exiting " + self.name
    
        def test(que1):
            try:
                global count
                # print thread_name
                url_custom = url + que1
    
                result = requests.get(url_custom, auth=(username, password), timeout=5)  # 发送获取RabbitMq服务上某个队列的请求
    
                queue = json.loads(result.content)  # json.loads()函数是将json格式数据转换为字典
    
                list_que = []  # 存放某个queue的信息,如名字及未消费消息数
    
                queue_name = queue.get('name')  # 队列名
                print '队列名:', queue_name
                mess_num = queue.get('messages')  # 待处理队列数
                print '消息数:', mess_num
    
                if mess_num > threshold:
                    flag = False  # False指10s内10次deliver_get_rate的值都为0
                    time_count = 0
                    while time_count < 10:
                        time_count += 1
                        try:
                            deliver_get_rate = queue.get('message_stats').get('deliver_get_details').get('rate')
                            print 'deliver_get_details:' + str(deliver_get_rate)
                        except:
                            print "获取deliver_get_rate参数失败"
                            break
                        if deliver_get_rate > 0:
                            flag = True
                        if flag is True:
                            break
                        time.sleep(1)
    
                    if flag is False:
                        lock.acquire()
                        count += 1
                        print "超出阈值且不再消费消息的队列数:" + str(count)
                        lock.release()
                        print "队列:" + queue_name + "消息数累积太多,大于" + str(threshold) + "条"
                        logger.info('队列' + queue_name + '消息累积太多')
                        list_que.append(queue_name)
                        list_que.append(mess_num)
                        lock.acquire()
                        list_all_que.append(list_que)
                        print "list_all" + str(list_all_que)
                        lock.release()
            except urllib2.HTTPError:
                logger.error('HTTP请求出错出现异常')
            except AttributeError:
                logger.error('字典取值异常')
            except MemoryError:
                logger.error('内存错误')
            except IOError:
                logger.error('IO失败')
            except Exception:
                logger.error('出现异常')
    
        # 定义一个thread_list,将每个que都放进这个list
        thread_list = []
        for que in queueList:
            i = queueList.index(que)
            thread = MyThread(i, "Thread-" + str(i), que)
            thread_list.append(thread)
        # 开始执行每个线程
        for t in thread_list:
            t.setDaemon(True)
            t.start()
        for t in thread_list:
            t.join(15)
        # 等待每个线程执行完成后再进行后面的操作
        print "active_count:" + str(threading.activeCount())
    
        if count > 0:
            send_mail(count, list_all_que)
        else:
            print "成功"
    

    思考中,判断逻辑?是否需要判断低于阈值但是不正在消费的队列消息。count的计数是否可以用队列的长度代替。

    日志使用:https://blog.csdn.net/lizhe_dashuju/article/details/72579705
    配置文件使用:https://www.jianshu.com/p/4f50ce352b2f

    相关文章

      网友评论

          本文标题:rabbitmq的队列消息累积监控总结

          本文链接:https://www.haomeiwen.com/subject/hmqlzqtx.html