0x01 基本概念
架构图如上图所示,由三部分组成:消息中间件(message broker)、任务执行单元(worker)、任务执行结果存储(task result store)
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache
0x02 实例
创建实例app:
# celery.py
from celery import Celery
app = Celery('task_name', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')
# 加载celery配置
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Oslo',
CELERY_ENABLE_UTC=True,
## 可以把配置写到py文件内加载
app.config_from_object('django.conf:settings')
## 自动发现任务(需要在app下创建tasks.py模块)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
创建tasks.py:
# tasks.py
from celery import app
@app.task
def add(x, y):
return x+y
启动worker(启动完,当然此时broker中还没有任务,worker此时相当于处于待命的状态)
celery -A tasks task_name --loglevel-info
触发任务:
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
time.sleep(1)
print 'task done: {0}'.format(result.get())
delay返回的是一个AsyncResult对象,里面存的就是一个异步的结果,当任务完成时result.ready()为True,然后用result.get()取结果即可。
至此,一个简单的任务队列就完成了。
欢迎关注微信公众号(coder0x00)或扫描下方二维码关注,我们将持续搜寻程序员必备基础技能包提供给大家。
网友评论