celery 是基于python实现的一个分布式任务框架。
在celery 3.2.x 以下的版本和celery 4.x 版本涉及到的配置方式以及使用方式有点不同。
文章中的内容是介绍celery 4.0 在django中如何配置,以及简单的运用。
celery 4.x 仅支持django 1.8以上版本。
未来的celery 5.x 以上仅支持python 3.5或者更新的版本。
1:配置应用环境
使用虚拟环境去配置环境
mkvirtualenv celery_demo
pip install django==1.8.10
pip install celery==4.0.2
django-admin start startproject celerydemo #创建django项目
cd ./celery_demo
python manage.py startapp demo #创建django app
在完成上述的项目创建后,那么需要的是进行一些配置。
对celery.py的配置
/celery_demo/celery_demo/celery.py
# coding:utf-8
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celerydemo.settings')
app = Celery('celerydemo')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
搞好celery.py 文件后,需要在/celery_demo/celery_demo/init.py 进行一些配置
# coding:utf-8
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
这个配置文件的作用是确保app的 task总会被django的引用和支持。
当完成上述的步骤后,可以尝试下执行celery 程序
celery worker -A celery_demo -l info
当使用上述命令的时候,会看到Error
[2017-01-09 17:31:52,299: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 2.00 seconds...
这个时候得在/celery_demo/celery_demo/settings.py 中配置好CELERY_BROKER_URL 的参数。
我使用的是redis作为broker
配置如下:
#/celery_demo/celery_demo/settings.py
CELERY_BROKER_URL = 'redis://127.0.0.1/2'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
还需要install redis
pip install redis
这个时候好了,可以执行
celery worker -A celerydemo -l info
会显示celery已经在运行了。
那么可以使用下面的去测试代码或者项目。
python manage.py shell
#在命令行脚本中
from celerydemo.celery import debug_task
debug_task.delay()
会到看终端中出现log
[2017-01-09 17:37:29,279: WARNING/PoolWorker-2] Request: <Context: {u'origin': u'gen78371@youyoudeMacBook-Pro.local', u'args': [], u'chain': None, u'root_id': u'384bb5a3-f041-48e2-a4c6-f19c730f4eb7', u'expires': None, u'is_eager': False, u'correlation_id': u'384bb5a3-f041-48e2-a4c6-f19c730f4eb7', u'chord': None, u'reply_to': u'6254263f-0c98-371e-b58e-3ca94fab880e', u'id': u'384bb5a3-f041-48e2-a4c6-f19c730f4eb7', u'kwargsrepr': u'{}', u'lang': u'py', u'retries': 0, u'task': u'celerydemo.celery.debug_task', u'group': None, u'timelimit': [None, None], u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u''}, u'hostname': u'celery@youyoudeMacBook-Pro.local', 'called_directly': False, u'parent_id': None, u'argsrepr': u'()', u'errbacks': None, u'callbacks': None, u'kwargs': {}, u'eta': None, '_protected': 1}>
[2017-01-09 17:37:29,280: INFO/PoolWorker-2] Task celerydemo.celery.debug_task[384bb5a3-f041-48e2-a4c6-f19c730f4eb7] succeeded in 0.00210009199509s: None
那么大体上celery 4.x 已经在django中配置好,甚至可以执行部分的异步任务。
下面的是拓展
1:如何处理定时任务
a:
pip install django-celery-beat
b: 添加 django_celery_beat 模块到 项目的settings.py 的INSTALLED_APPS 中
INSTALLED_APPS = ( ..., 'django_celery_beat', )
c:应用模块中的表单到数据库中
python manage.py migrate
d:使用django 的任务心跳调度器
celery beat -A celerydemo -l info
e:配置定时任务
python manage.py runserver
# :8080/admin/django_celery_beat/ 中配置
2:定时任务的返回结果处理
以往的数据中,返回的结果是可以不需要或者忽略的。
a:配置上面很简单
pip install django-celery-results
b:添加 django_celery_results 到 INSTALLED_APPS
c:创建应用django_celery_results 所需的表单
python manage.py migrate django_celery_results
d:在配置文件中使用 django_celery_results 作为celery的使用
#settings.py
CELERY_RESULT_BACKEND = 'django-db'
or
CELERY_RESULT_BACKEND = 'django-cache'
网友评论
Traceback (most recent call last):
File "c:\python35\lib\site-packages\billiard\pool.py", line 358, in workloop
result = (True, prepare_result(fun(*args, **kwargs)))
File "c:\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
我的报这个错