背景
负责的自定义查询系统,使用了rabbitMQ,来实现异步查询。
即web端查询,先提交到消息队列,同时发起轮询请求拉取查询结果;
多进程消费者从消息队列中取消息,触发查询引擎查询;
查询引擎响应后,产出文件到服务端;此时web端轮询请求,获取到响应,完成本次查询。
问题
系统运行过程中,性能很差,偶发卡顿现象;需要人为干预,杀死查询process才能保证系统正常。
定位过程
(1)以为是系统用户,看到不出结果,反复提交相同sql;
(2)sql查询处,加了查询态不允许提交的前端限制,仍然有重复sql提交到gp;
(3)后来发现,平台提交了一次,查询引擎处有多条sql执行;
(4)然后锁定了问题范围,“web服务多次提交到mq”或者“mq消息多次消费”;
(5)分析消费者代码和日志,发现mq连接隔一段时间,会断开连接;
#伪日志
Pid: 29354, Start Watching Rabbitmq
Pid: 29354, receive msg[body:xxx][userName:xxx]
Pid: 29354, receive msg[body:xxx][userName:xxx]
Pid: 29354, Rabbitmq connection closed, reconnecting...
Pid: 29354, Start Watching Rabbitmq
...
(6)然后定位到rabbitmq-server因为连接没有心跳,主动断开了和消费者的连接。
#/var/log/rabbitmq处mq-server运行日志
2020-03-06 17:43:01.762 [error] <0.14871.1> closing AMQP connection <0.14871.1> (10.xxx.x.xxx:port1 -> 10.xxx.x.xxx:port2):
missed heartbeats from client, timeout: 60s
解决
从网上找了一些资料,之所以出现这种情况,是使用 python pika方式获取mq连接(pika.BlockingConnection),消费时间过长导致的。
主要有两种思路解决这个问题:
(1)加heartbeat_interval参数,防止mq-server因为没有心跳而主动断开连接;
(2)调整代码逻辑,消费者主线程定时发心跳交互(connection.process_data_events()),耗时较长的消息消费,在子线程中执行。
我们采用了第一种方式,加heartbeat参数,最终解决了这个问题。
在获取mq连接过程中,加了不检测心跳的参数。后续自动连接断开问题消除。
pika.BlockingConnection(pika.ConnectionParameters(host=mqhost))
#改成
pika.BlockingConnection(pika.ConnectionParameters(host=mqhost,heartbeat_interval=0))
相关资料
主要参考了这篇,写的乱了一些,但是能用
https://blog.csdn.net/xc_zhou/article/details/81021802
网友评论