1.在一些复杂的处理中,使用队列
在settings.py中添加
CELERY_QUEUES = {
"default": { # 这是上面指定的默认队列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"web_tasks_1": {
"routing_key": "demoapp.tasks.get_data_tencent",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"web_tasks_2": {
"routing_key": "demoapp.tasks.get_alipay_data",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
}
class QueRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task =='demoapp.tasks.get_data_tencent':
return {
'queue': 'web_tasks_1',
}
# 我的dongwm.tasks文件里面有2个任务都是test开头
elif task == 'demoapp.tasks.get_alipay_data':
return {
"queue": "web_tasks_2",
}
# 剩下的其实就会被放到默认队列
else:
return {
"queue": "default",
}
# CELERY_ROUTES本来也可以用一个大的含有多个字典的字典,但是不如直接对它做一个名称统配
CELERY_ROUTES = (QueRouter(), )
2.启动特定队列处理任务
celery -A proj.celery:app worker -l info -n worker.%h -Q default
celery -A proj.celery:app worker -l info -n worker.%h -Q web_tasks_2
3.定时任务
settings.py
定时任务内容
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add-every-3-seconds': {
'task': 'demoapp.tasks.add',
# 'schedule': crontab(minute=u'40', hour=u'17',),
'schedule': timedelta(seconds=3),
'args': (16, 16)
},
}
tasks.py
定时任务
from __future__ import absolute_import
from celery import shared_task
@shared_task
def add(x, y):
return x + y
效果图
![](https://img.haomeiwen.com/i3004516/824dd84cf960afa0.png)
![](https://img.haomeiwen.com/i3004516/4508470b3aec8c3d.png)
网友评论