美文网首页
rabbitmq的队列消息累积监控总结

rabbitmq的队列消息累积监控总结

作者: 海的那一边 | 来源:发表于2019-05-20 18:12 被阅读0次

目标:对rabbitmq的队列进行消息累积监控。
整体思路:

  1. 根据rabbitmq提供的API,获取要监控的队列信息。通过http请求获取某个queue信息:http://192.168.xx.xx:xx/api/queues/queue_name(如果需要获取全部的队列信息则使用http://192.168.xx.xx:xx/api/queues/
    从返回中解析到对列名字及对列对应的未读消息数。
  2. 进行消息累积逻辑判断。首先判断获取的未读消息数是否超过累积阈值,再针对超过阈值的队列,判断其消息是否正在被消费(10秒内每一秒获取一次消费者消费的速度deliver_get_rate参数,如果一次也不大于零,则认为是当前没有消费者消费),最后将消息超过阈值且消息未正在消费的队列信息存入一个列表中,并计数。
  3. 判断累积消息的队列是否大于0,即如果有累积消息的队列,则发送邮件通知相关人员。
  4. 通过定时任务,持续监测rabbitmq队列消息。

相关实现:

  1. 定时任务:每隔一段时间执行一次。具体实现:
    1). 引入模块:import schedule
    2). 执行:
schedule.every(intervalTime).seconds.do(job_custom)

while True:

 schedule.run_pending()

在while True死循环中,schedule.run_pending()是保持schedule一直运行

  1. 配置文件。
    1). 配置文件名:monitorConfig.ini,文件内容例如:
**[all_config]**

# qa or online

environment = online

# all or custom

all_or_custom = custom

2). 读取配置文件:


os.chdir("/Users/xxx/Downloads/qianyi/")

# 生成一个实例

cf = configparser.ConfigParser()

filename = cf.read("monitorConfig.ini")

# 获取环境配置,qa或者pub

environment = str(cf.get("all_config", "environment"))

# 获取部分队列未读消息还是所有队列未读消息

all_or_custom = str(cf.get('all_config', 'all_or_custom'))

  1. 日志:引入模块import logging.handlers

# 设置logging

logger = logging.getLogger('rabbitMqLogger')

logger.setLevel(logging.DEBUG)

rf_handler = logging.handlers.TimedRotatingFileHandler(filename='rabbitMq.log', when='D', interval=1, backupCount=20)

rf_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))

# 在控制台打印日志

handler = logging.StreamHandler()

handler.setLevel(logging.DEBUG)

handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))

logger.addHandler(rf_handler)

logger.addHandler(handler)

TimedRotatingFileHandler的构造函数定义如下:
TimedRotatingFileHandler(filename [,when [,interval [,backupCount]]])
filename 是输出日志文件名的前缀,比如log/myapp.log
when 是一个字符串的定义如下:
“S”: Seconds
“M”: Minutes
“H”: Hours
“D”: Days
“W”: Week day (0=Monday)
“midnight”: Roll over at midnight
interval 是指等待多少个单位when的时间后,Logger会自动重建文件,当然,这个文件的创建
取决于filename+suffix,若这个文件跟之前的文件有重名,则会自动覆盖掉以前的文件,所以
有些情况suffix要定义的不能因为when而重复。
backupCount 是保留日志个数。默认的0是不会自动删除掉日志。若设3,则在文件的创建过程中
库会判断是否有超过这个3,若超过,则会从最先创建的开始删除。

  1. 异常处理:使用try: except:进行异常处理,防止代码遇到异常停止运行。
  2. 多线程。如果请求的队列较多,每个队列都要进行未读消息数及消费数量的判断,加起来会比较耗时,考虑使用多线程,可以加快程序运行速度。
def job_custom():
    list_all_que = []  # 所有消息数超处阈值的队列名及消息数量
    print '开始执行时间:',(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

    class MyThread(threading.Thread):  # 继承父类threading.Thread
        def __init__(self, thread_id, name, que1):
            threading.Thread.__init__(self)
            self.thread_id = thread_id
            self.name = name
            self.que1 = que1

        def run(self):  # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
            print "Starting " + self.name
            test(self.que1)
            print "Exiting " + self.name

    def test(que1):
        try:
            global count
            # print thread_name
            url_custom = url + que1

            result = requests.get(url_custom, auth=(username, password), timeout=5)  # 发送获取RabbitMq服务上某个队列的请求

            queue = json.loads(result.content)  # json.loads()函数是将json格式数据转换为字典

            list_que = []  # 存放某个queue的信息,如名字及未消费消息数

            queue_name = queue.get('name')  # 队列名
            print '队列名:', queue_name
            mess_num = queue.get('messages')  # 待处理队列数
            print '消息数:', mess_num

            if mess_num > threshold:
                flag = False  # False指10s内10次deliver_get_rate的值都为0
                time_count = 0
                while time_count < 10:
                    time_count += 1
                    try:
                        deliver_get_rate = queue.get('message_stats').get('deliver_get_details').get('rate')
                        print 'deliver_get_details:' + str(deliver_get_rate)
                    except:
                        print "获取deliver_get_rate参数失败"
                        break
                    if deliver_get_rate > 0:
                        flag = True
                    if flag is True:
                        break
                    time.sleep(1)

                if flag is False:
                    lock.acquire()
                    count += 1
                    print "超出阈值且不再消费消息的队列数:" + str(count)
                    lock.release()
                    print "队列:" + queue_name + "消息数累积太多,大于" + str(threshold) + "条"
                    logger.info('队列' + queue_name + '消息累积太多')
                    list_que.append(queue_name)
                    list_que.append(mess_num)
                    lock.acquire()
                    list_all_que.append(list_que)
                    print "list_all" + str(list_all_que)
                    lock.release()
        except urllib2.HTTPError:
            logger.error('HTTP请求出错出现异常')
        except AttributeError:
            logger.error('字典取值异常')
        except MemoryError:
            logger.error('内存错误')
        except IOError:
            logger.error('IO失败')
        except Exception:
            logger.error('出现异常')

    # 定义一个thread_list,将每个que都放进这个list
    thread_list = []
    for que in queueList:
        i = queueList.index(que)
        thread = MyThread(i, "Thread-" + str(i), que)
        thread_list.append(thread)
    # 开始执行每个线程
    for t in thread_list:
        t.setDaemon(True)
        t.start()
    for t in thread_list:
        t.join(15)
    # 等待每个线程执行完成后再进行后面的操作
    print "active_count:" + str(threading.activeCount())

    if count > 0:
        send_mail(count, list_all_que)
    else:
        print "成功"

思考中,判断逻辑?是否需要判断低于阈值但是不正在消费的队列消息。count的计数是否可以用队列的长度代替。

日志使用:https://blog.csdn.net/lizhe_dashuju/article/details/72579705
配置文件使用:https://www.jianshu.com/p/4f50ce352b2f

相关文章

网友评论

      本文标题:rabbitmq的队列消息累积监控总结

      本文链接:https://www.haomeiwen.com/subject/hmqlzqtx.html