想了解更多的同学,推荐去看官方文档,写的很清晰:celery官方文档
这里我们使用redis作为Broker
一、 redis作为broker
1. 安装
使用redis作为broker需要安装额外的依赖, 下面这句就可以安装celery及其依赖。
pip install -U "celery[redis]"
然后你需要启动redis服务。
如果你已经配置好redis,就直接安装celery就好了。
pip install celery
2. 配置
只需要配置你的redis链接地址就可以了.
app.conf.broker_url = 'redis://localhost:6379/0'
redis链接的格式为
redis://:password@hostname:port/db_number
协议后面所有的参数都是可选参数,默认连接到localhost的6379端口, 默认数据库为 db0 .
3. 结果
如果想把结果也存储在redis中, 可以这样配置
app.conf.result_backend = 'redis://localhost:6379/0'
二、应用
首先我们要创建一个Celery应用,这个应用是用来作为你所有Celery操作的接入点。这里我们把所有的东西都塞到一个文件里面, 实际项目中需要分开写, 请参考文档。
我们先创建文件 tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
第一个参数是当前模块的名字,必选参数, 只有当任务在__main__
中定义时,才会自动生成名称。
第二个参数是broker的参数, 指定你要使用的broker消息队列,这里我们使用redis
三、运行worker服务
这样我们就启动了worker服务。
celery -A tasks worker --loglevel=info
启动后会看到屏幕得到这样的输出提示:
image.png四、调用任务
调用任务我们需要用到delay()
方法
>>> from tasks import add
>>> add.delay(4, 4)
add(4,4)这个任务就会发送到队列,再由worker去处理和计算。我们可以再刚刚开启的worker服务中看到处理的提示:
image.png调用每个任务会返回一个AsyncResult
的实例, 我们可以用它来检查任务状态,获取返回值。
五、保存结果
由于celery是异步处理, 所以如果想要拿到结果,需要有个后端(叫做backend) 暂时存储worker处理完的结果,celery有几种内置的结果后端: SQLAlchemy/Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP), 当然你可以自己定义。更多关于backend查看Result Backends.
这里我们依然使用redis。
把 tasks.py
中的app改为:
app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')
这样我们就把backend加入进去了。此时我们重新开启服务
celery -A tasks worker --loglevel=info
将刚刚的python交互shell关掉重新开启(不然使用的是之前的缓存)
image.pngready()
方法返回任务是否处理完成, 你可以轮询结果等待任务完成, 不过在异步程序中通常不会这样使用,而是使用 get()
方法加入timeout参数。如果任务报错, get方法会再次抛出异常, 你可以指定 propagate
参数来
注意!!!
Backends 存储结果是消耗资源的,为了确保资源及时释放,每次使用完数据调用forget()
来释放资源。
关于结果存储的更多信息查看 celery.result
六、配置
Celery 使用的时候不需要太多的操作和配置,开箱即用,然鹅, 如果你深入使用的时候,会发现它又可以配置很多参数。 默认的参数有时候可能不够用, 所以就需要指定配置参数了。参数配置可以看这里 Configuration and defaults
配置可以直接在app初始化的时候指定。比如我们可以指定序列器为json(当然默认就是json)
app.conf.task_serializer = 'json'
如果有很多参数需要配置, 可以用update
函数
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
对于大一些的项目,我们推荐使用专门的配置。配置集中化也有利于项目管理和维护。你可以通过app.config_from_object()
方法来指定配置模块。配置模块通常命名为celeryconfig
,当然你也可以起别的名字。
比如我们有这样的配置文件celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
指定配置的时候:
app.config_from_object('celeryconfig')
为了验证我们的配置是否有语法错误,是否正常,我们可以import一下试试:
$ python -m celeryconfig
更多配置详情请看 Configuration and defaults
进一步学习
http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps
网友评论