美文网首页程序员
分布式队列神器 Celery

分布式队列神器 Celery

作者: cf6d95617c55 | 来源:发表于2018-12-25 16:07 被阅读0次

昨天在一个自动化项目中看到代码引用Celery这个模块,我一脸懵逼,不知道这是个何物。当然,遇到不会的肯定是第一时间问度娘啦。

Celery 是什么?

Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
Celery 专注于实时任务处理,支持任务调度。
说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。

概念的东西都是很悬的东西,也是看完一次都不知道它在说啥。那问题来了,是如何应用Celery的呢?

快速入门

首先,我们要理解 Celery 本身不是任务队列,它是管理分布式任务队列的工具,或者换一种说法,它封装好了操作常见任务队列的各种操作,我们用它可以快速进行任务队列的使用与管理,当然你也可以自己看 rabbitmq 等队列的文档然后自己实现相关操作都是没有问题的

Celery 是语言无关的,虽然它是用 Python 实现的,但他提供了其他常见语言的接口支持。只是如果你恰好使用 Python 进行开发那么使用 Celery 就自然而然了。

想让 Celery 运行起来我们要明白几个概念:

Brokers

brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)

常见的 brokers 有 rabbitmq、redis、Zookeeper 等

Result Stores / backend

顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了

常见的 backend 有 redis、Memcached 甚至常用的数据都可以。

Workers

就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行

Tasks

就是我们想在队列中进行的任务咯,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。

理解以上概念后,我们可以通过一些实例来加深理解。这里我们用 redis 当做 celery 的 broker 和 backend。

安装 Celery 和 redis 以及 python 的 redis 支持:

apt-get install redis-server
pip install redis
pip install celery

然后,我们需要写一个task:

#/usr/bin/python
from celery import Celery

app=Celery('tasks',backend='redis://127.0.0.1:6379/0',broker='redis://127.0.0.1:6379/0') #配置好celery的backend和broker

@app.task #普通函数装饰为 celery task
def add(x,y):
    return x + y

OK,到这里,broker 我们有了,backend 我们有了,task 我们也有了,现在就该运行 worker 进行工作了,在 tasks.py 所在目录下运行:

celery -A tasks worker --loglevel=info

意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态)

最后一步,就是触发任务啦,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数:

#/usr/bin/python
import time
from tasks import add

result=add.delay(4,4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while  not result.ready():
    time.sleep()

print 'task done:{0}'.format(result.get())

运行此脚本

python trigger.py 
#task done:8

delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果即可。

进阶用法

经过快速入门的学习后,我们已经能够使用 Celery 管理普通任务,但对于实际使用场景来说这是远远不够的,所以我们需要更深入的去了解 Celery 更多的使用方式。

首先来看之前的task:

@app.task  #普通函数装饰为 celery task
def add(x, y):
    return x + y

这里的装饰器app.task实际上是将一个正常的函数修饰成了一个 celery task 对象,所以这里我们可以给修饰器加上参数来决定修饰后的 task 对象的一些属性。

首先,我们可以让被修饰的函数成为 task 对象的绑定方法,这样就相当于被修饰的函数 add 成了 task 的实例方法,可以调用 self 获取当前 task 实例的很多状态及属性。

其次,我们也可以自己复写 task 类然后让这个自定义 task 修饰函数 add ,来做一些自定义操作。

根据任务状态执行不同操作

任务执行后,根据任务状态执行不同操作需要我们复写 task 的 on_failure、on_success 等方法:

#/usr/bin/python
from celery import Celery

app=Celery('tasks',backend='redis://127.0.0.1:6379/0',broker='redis://127.0.0.1:6379/0')
class MyTask(app.Task):
    def on_success(self,retval,task_id,args,kwargs):
        print 'task done: {0}'.format(retval)
        return super(MyTask,self).on_success(retval,task_id,args,kwargs)
    def on_failure(self,exc,task_id,args,kwargs,einfo):
        print 'task fail, reason:{0}'.format(exc)
        return super(MyTask,self).on_failure(exc,task_id,args,kwargs,einfo)

@app.task(base=MyTask)
def add(x,y):
    return x + y

然后继续运行 worker:

celery -A tasks worker --loglevel=info

运行脚本,得到:

图1

再修改下tasks:

@app.task
def add(x,y):
    raise KeyError
    return x + y

重新运行 worker,再运行 trigger.py:


图2

可以看到,任务执行成功或失败后分别执行了我们自定义的 on_failure、on_success

绑定任务为实例方法

#/usr/bin/python
from celery import Celery
from celery.utils.log import get_task_logger

app=Celery('tasks',backend='redis://127.0.0.1:6379/0',broker='redis://127.0.0.1:6379/0')
logger=get_task_logger(__name__)
@app.task(bind=True)
def add(self,x,y):
    logger.info(self.request.__dict__)
    return x + y

然后,重新如下tasks.py和trigger.py


图3

任务状态回调

实际场景中得知任务状态是很常见的需求,对于 Celery 其内建任务状态有如下几种:

参数 说明
PENDING 任务等待中
STARTED 任务已开始
SUCCESS 任务执行成功
FAILURE 任务执行失败
RETRY 任务将被重试
REVOKED 任务取消

当我们有个耗时时间较长的任务进行时一般我们想得知它的实时进度,这里就需要我们自定义一个任务状态用来说明进度并手动更新状态,从而告诉回调当前任务的进度,具体实现:


#/usr/bin/python
from celery import Celery
import time

app=Celery('tasks',backend='redis://127.0.0.1:6379/0',broker='redis://127.0.0.1:6379/0')

@app.task(bind=True)
def test_mes(self):
    for i in xrange(1,11):
        time.sleep(0.1)
        self.update_state(state="PROGRESS",meta={'p':i*10})

    return 'finish'

然后修改 trigger.py :

# coding:utf-8
#/usr/bin/python
import time
import sys
from tasks import test_mes

def pm(body):
    res=body.get('result')
    if body.get('status') == 'PROGRESS':
        sys.stdout.write(u'\r任务进度:{0}%'.format(res.get('p')))
        sys.stdout.flush()
    else:
        print '\r'
        print res
r=test_mes.delay()
print r.get(on_message=pm,propagate=False)

然后运行任务:


图4

定时/周期任务

Celery 进行周期任务也很简单,只需要在配置中配置好周期任务,然后在运行一个周期任务触发器( beat )即可:
新建 Celery 配置文件 celery_config.py:

#/usr/bin/python
from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'ptask': {
        'task': 'tasks.period_task',
        'schedule': timedelta(seconds=5),
    },
}
 
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

配置中 schedule 就是间隔执行的时间,这里可以用 datetime.timedelta 或者 crontab 甚至太阳系经纬度坐标进行间隔时间配置

如果定时任务涉及到 datetime 需要在配置中加入时区信息,否则默认是以 utc 为准。例如中国可以加上:

CELERY_TIMEZONE = 'Asia/Shanghai'

然后在 tasks.py 中增加要被周期执行的任务:

#/usr/bin/python
from celery import Celery


app=Celery('tasks',backend='redis://127.0.0.1:6379/0',broker='redis://127.0.0.1:6379/0')
app.config_from_object('celery_config')

@app.task(bind=True)
def period_task(self):
    print "period task done:{0}".format(self.request.id)

然后重新运行 worker,接着再运行 beat:

celery -A tasks beat
图5

相关文章

  • python-分布式任务队列

    celery 分布式任务队列工具 Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布式消息传递 基...

  • Celery初体验

    Celery与任务队列 Celery是Python中流行的分布式任务队列。所谓分布式任务队列,是一种将任务分发到不...

  • Celery初级

    什么是Celery Celery是管理分布式队列工具,可用于进行任务队列的使用和管理 Celery的结构 Brok...

  • celery学习笔记

    Celery 标签(空格分隔): celery Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布...

  • 实现简单的python3+flask+celery+redis框

    详解python3+flask+celery+redis Celery是什么? Celery是个异步分布式任务队列...

  • 分布式队列神器 Celery

    http://python.jobbole.com/87238/ ( 网站 )

  • 分布式队列神器 Celery

    昨天在一个自动化项目中看到代码引用Celery这个模块,我一脸懵逼,不知道这是个何物。当然,遇到不会的肯定是第一时...

  • celery--turorial

    Celery Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/...

  • celery redis rabbitMQ各是什么及之间的区别?

    Celery: Celery是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线...

  • Python Celery学习扎记

    Celery是什么 Celery是一个管理分布式队列的工具,它封装好了常见任务队列的各种操作,能够快速进行任务队列...

网友评论

    本文标题:分布式队列神器 Celery

    本文链接:https://www.haomeiwen.com/subject/mybflqtx.html