目标:对rabbitmq的队列进行消息累积监控。
整体思路:
- 根据rabbitmq提供的API,获取要监控的队列信息。通过http请求获取某个queue信息:http://192.168.xx.xx:xx/api/queues/queue_name(如果需要获取全部的队列信息则使用http://192.168.xx.xx:xx/api/queues/)
从返回中解析到对列名字及对列对应的未读消息数。 - 进行消息累积逻辑判断。首先判断获取的未读消息数是否超过累积阈值,再针对超过阈值的队列,判断其消息是否正在被消费(10秒内每一秒获取一次消费者消费的速度deliver_get_rate参数,如果一次也不大于零,则认为是当前没有消费者消费),最后将消息超过阈值且消息未正在消费的队列信息存入一个列表中,并计数。
- 判断累积消息的队列是否大于0,即如果有累积消息的队列,则发送邮件通知相关人员。
- 通过定时任务,持续监测rabbitmq队列消息。
相关实现:
- 定时任务:每隔一段时间执行一次。具体实现:
1). 引入模块:import schedule
2). 执行:
schedule.every(intervalTime).seconds.do(job_custom)
while True:
schedule.run_pending()
在while True死循环中,schedule.run_pending()是保持schedule一直运行
- 配置文件。
1). 配置文件名:monitorConfig.ini,文件内容例如:
**[all_config]**
# qa or online
environment = online
# all or custom
all_or_custom = custom
2). 读取配置文件:
os.chdir("/Users/xxx/Downloads/qianyi/")
# 生成一个实例
cf = configparser.ConfigParser()
filename = cf.read("monitorConfig.ini")
# 获取环境配置,qa或者pub
environment = str(cf.get("all_config", "environment"))
# 获取部分队列未读消息还是所有队列未读消息
all_or_custom = str(cf.get('all_config', 'all_or_custom'))
- 日志:引入模块import logging.handlers
# 设置logging
logger = logging.getLogger('rabbitMqLogger')
logger.setLevel(logging.DEBUG)
rf_handler = logging.handlers.TimedRotatingFileHandler(filename='rabbitMq.log', when='D', interval=1, backupCount=20)
rf_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
# 在控制台打印日志
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(rf_handler)
logger.addHandler(handler)
TimedRotatingFileHandler的构造函数定义如下:
TimedRotatingFileHandler(filename [,when [,interval [,backupCount]]])
filename 是输出日志文件名的前缀,比如log/myapp.log
when 是一个字符串的定义如下:
“S”: Seconds
“M”: Minutes
“H”: Hours
“D”: Days
“W”: Week day (0=Monday)
“midnight”: Roll over at midnight
interval 是指等待多少个单位when的时间后,Logger会自动重建文件,当然,这个文件的创建
取决于filename+suffix,若这个文件跟之前的文件有重名,则会自动覆盖掉以前的文件,所以
有些情况suffix要定义的不能因为when而重复。
backupCount 是保留日志个数。默认的0是不会自动删除掉日志。若设3,则在文件的创建过程中
库会判断是否有超过这个3,若超过,则会从最先创建的开始删除。
- 异常处理:使用try: except:进行异常处理,防止代码遇到异常停止运行。
- 多线程。如果请求的队列较多,每个队列都要进行未读消息数及消费数量的判断,加起来会比较耗时,考虑使用多线程,可以加快程序运行速度。
def job_custom():
list_all_que = [] # 所有消息数超处阈值的队列名及消息数量
print '开始执行时间:',(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
class MyThread(threading.Thread): # 继承父类threading.Thread
def __init__(self, thread_id, name, que1):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.name = name
self.que1 = que1
def run(self): # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
print "Starting " + self.name
test(self.que1)
print "Exiting " + self.name
def test(que1):
try:
global count
# print thread_name
url_custom = url + que1
result = requests.get(url_custom, auth=(username, password), timeout=5) # 发送获取RabbitMq服务上某个队列的请求
queue = json.loads(result.content) # json.loads()函数是将json格式数据转换为字典
list_que = [] # 存放某个queue的信息,如名字及未消费消息数
queue_name = queue.get('name') # 队列名
print '队列名:', queue_name
mess_num = queue.get('messages') # 待处理队列数
print '消息数:', mess_num
if mess_num > threshold:
flag = False # False指10s内10次deliver_get_rate的值都为0
time_count = 0
while time_count < 10:
time_count += 1
try:
deliver_get_rate = queue.get('message_stats').get('deliver_get_details').get('rate')
print 'deliver_get_details:' + str(deliver_get_rate)
except:
print "获取deliver_get_rate参数失败"
break
if deliver_get_rate > 0:
flag = True
if flag is True:
break
time.sleep(1)
if flag is False:
lock.acquire()
count += 1
print "超出阈值且不再消费消息的队列数:" + str(count)
lock.release()
print "队列:" + queue_name + "消息数累积太多,大于" + str(threshold) + "条"
logger.info('队列' + queue_name + '消息累积太多')
list_que.append(queue_name)
list_que.append(mess_num)
lock.acquire()
list_all_que.append(list_que)
print "list_all" + str(list_all_que)
lock.release()
except urllib2.HTTPError:
logger.error('HTTP请求出错出现异常')
except AttributeError:
logger.error('字典取值异常')
except MemoryError:
logger.error('内存错误')
except IOError:
logger.error('IO失败')
except Exception:
logger.error('出现异常')
# 定义一个thread_list,将每个que都放进这个list
thread_list = []
for que in queueList:
i = queueList.index(que)
thread = MyThread(i, "Thread-" + str(i), que)
thread_list.append(thread)
# 开始执行每个线程
for t in thread_list:
t.setDaemon(True)
t.start()
for t in thread_list:
t.join(15)
# 等待每个线程执行完成后再进行后面的操作
print "active_count:" + str(threading.activeCount())
if count > 0:
send_mail(count, list_all_que)
else:
print "成功"
思考中,判断逻辑?是否需要判断低于阈值但是不正在消费的队列消息。count的计数是否可以用队列的长度代替。
日志使用:https://blog.csdn.net/lizhe_dashuju/article/details/72579705
配置文件使用:https://www.jianshu.com/p/4f50ce352b2f
网友评论