相关:
Celery-详解
Celery+Redis实现异步任务(1)
Celery+Redis实现异步任务(2)
Celery+Redis实现异步任务(3)
使用配置文件和队列
创建文件夹apps
创建/apps/celery_conf.py
from kombu import Queue
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
CELERY_IMPORTS = (
'apps.task1',
'apps.task2'
)
# 队列
CELERY_QUEUES = (
Queue('task_add', routing_key='task_add'),
Queue('task_subs', routing_key='task_subs')
)
# 给任务绑定队列
CELERY_ROUTES = {
'apps.task1.add':{'queue':'task_add','routing_key':'task_add'},
'apps.task2.subs':{'queue':'task_subs','routing_key':'task_subs'}
}
创建 /apps/__init__.py
from celery import Celery
app = Celery('test_task')
app.config_from_object('apps.celery_conf')
创建 /apps/task1.py
import celery
from apps import app
@app.task()
def add(x, y):
return x+y
创建/apps/task2.py
from apps import app
@app.task()
def subs(x, y):
return x-y
在于apps同级的目录下面创建hello.py
from apps.task1 import add
from apps.task2 import subs
if __name__ == "__main__":
add.delay(1, 2)
# add.apply_async(args=[1, 2])
subs.delay(1, 2)
# subs.apply_async(args=[1, 2])
启动命令:
celery -A apps worker -l INFO -n worker1@%h -Q task_add
# 另开一个窗口
celery -A apps worker -l INFO -n worker1@%h -Q task_subs
运行 hello.py
python hello.py
代码说明:
- task.delay():这是apply_async()方法的别名,接受的参数较为简单。
- task.apply_async():可以接受复杂参数。task.apply_async(args=[1,8], kwargs={"name":"123"})
配置多个队列的方法:
- apply_async(queue=队列名)
- app.task(queue=队列名)
- 配置文件的方式
CELERY_QUEUES = (
Queue('apps_task1', exchange=Exchange('apps_task1'), routing_key='apps_task1'),
Queue('apps_task2', exchange=Exchange('apps_task2'), routing_key='apps_task2'),
)
CELERY_ROUTES = {
'apps.task1.add': {'queue': 'apps_task1', 'routing_key': 'apps_task1'},
'apps.task2.subs': {'queue': 'apps_task2', 'routing_key': 'apps_task2'},
}
注意:
-
CELERY_ROUTES
的作用是,给任务分配queue
和routing_key
,然后worker
根据分配的queue
值执行相应的任务。 -
CELERY_ROUTES
中key
得到指定的方法名。 -
exchange
可以不用,这是rabbitmq
必须的,redis
可以不用。 -
queue
和routing_key
这两个值的名字不需要保持一致,那么为了方便使用和检查,最好保持一致。 -
CELERY_ROUTES
中其实可以不用指定routing_key
,可以在apply_async
方法传入
网友评论