使用celery结合pika处理异步任务,并将处理任务结果发布到其他队列中时遇到错误
获取pika 链接
class RabbitMQSingle:
def __new__(cls, *args, **kwargs):
if not hasattr(cls, "_instance"):
logger.debug("INIT RabbitMQ")
parameters = pika.URLParameters("amqp://guest:guest@localhost:5672/%2F?heartbeat=5")
cls._conn = pika.BlockingConnection(parameters)
cls._instance = super(RabbitMQSingle, cls).__new__(cls)
return cls._instance
def get_conn(self):
return self._conn
conn = RabbitMQSingle().get_conn()
channel = conn.channel()
过一段时间就会出现下面错误
[2019-03-13 18:49:00,399: ERROR/ForkPoolWorker-2] Read empty data, calling disconnect
[2019-03-13 18:49:00,399: INFO/ForkPoolWorker-2] Disconnected from RabbitMQ at localhost:5672 (-1): EOF
[2019-03-13 18:49:00,400: ERROR/ForkPoolWorker-2] Connection close detected; result=BlockingConnection__OnClosedArgs(connection=<SelectConnection CLOSED socket=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>, reason_code=-1, reason_text='EOF')
[2019-03-13 18:49:00,403: ERROR/ForkPoolWorker-2] Task services.tasks.rong360.analysis_supplement_info[149cd386-91a2-4f1f-b614-c22219bad5f3] raised unexpected: The AMQP connection was closed (-1) EOF
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/data/ace_club/services/tasks/rong360.py", line 78, in analysis_supplement_info
basic_publish(Risk.RISK_CHECK, json.dumps({'com_id': order.com_id, 'order_id': order.id}))
File "/data/ace_club/services/tasks/celery.py", line 13, in basic_publish
channel = conn.channel()
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 800, in channel
channel._flush_output(opened_args.is_ready)
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1292, in _flush_output
*waiters)
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, 'EOF')
原因是heartbeat参数没有设置正确
heartbeat是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接
# 设置成0即可
parameters = pika.URLParameters("amqp://guest:guest@localhost:5672/%2F?heartbeat=0")
网友评论