Celery

作者: 甚了 | 来源:发表于2016-11-17 14:55 被阅读196次

    Celery

    官方网站

    Celery是一个分布式的任务队列,负责任务的执行与调度。


    Celery的架构由三部分组成:

    1. 消息中间件(message broker);
    2. 任务执行单元(worker);
    3. 任务执行结果存储(task result store)组成。

    安装:

    $ pip install celery


    框架集成:

    框架 集成
    Pyramid pyramid_celery
    Pylons celery-pylons
    Flask not needed
    web2py web2py-celery
    Tornado tornado-celery

    对于Flask项目:

    • 可以用Redis作为Celery的Broker,负责传递通讯消息。
    • pip install flask-celery-helper 安装Celery和它的Flask扩展
    • pip install redis 安装Redis的python扩展。

    Celery First Step

    选择消息中间件(Message Broker)

    Celery是通过消息(Message)来沟通的,通常使用一个消息中间件来连接客户端和工作端。初始化一个任务后,客户端通过添加一个消息到队列(Queue),消息中间件(Broker)会将消息发送到工作端处理

    Name Status Monitoring Remote Control
    RabbitMQ Stable Yes Yes
    Redis Stable Yes Yes
    Amazon SQS Stable No No
    Zookeeper Experimental No No

    应用

    简单应用

    创建一个task.py文件

    from celery import Celery
    
    celery = Celery('tasks', broker='redis://localhost:6379/0')
    
    @celery.task
    def add(x, y):
        return x + y
    
    Flask应用

    将Celery配置加入到配置项中:

        CELERY_IMPORTS = (
            "app.tasks.mail",
        )
        CELERY_BROKER_URL = 'redis://localhost:6379/0'
        CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
        DEBUG = True
    

    创建并初始化Celery:

    from flask import Flask
    from flask_celery import Celery
    
    app = Flask(__name__)
    celry = new Celery()
    
    # 初始化Celery
    celery.init_app(app)
        
    
    运行Worker服务

    $ celery -A task.celery worker --loglevel=info

    启动

    可以通过命令查看更多帮助文档:

    $ celery worker --help
    $ celery help

    调用任务
    >>> from task import add
    >>> add.delay(4, 4)
    
    <AsyncResult: 7031f593-7166-49dd-8c4b-3717e163de16>
    
    调用任务

    同样你会看到log中已经接收到并且处理了任务

    [2016-11-17 12:44:32,642: INFO/MainProcess] Received task: task.add[7031f593-7166-49dd-8c4b-3717e163de16]  
    [2016-11-17 12:44:32,645: INFO/PoolWorker-2] Task task.add[7031f593-7166-49dd-8c4b-3717e163de16] succeeded in 0.000571212000068s: 8
    
    
    Log
    获取结果

    如果想要持久化或者在某些地方使用任务反回的结果,有一些内建的后端服务可以使用。

    使用Redis作为后台的话可以使用如下配置创建celery实例:

    celery = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
    

    当然也可以搭配使用:

    • RabbitMQ 作为消息中间件
    • Redis 作为后端
    • more info
    1. 现在我们重启Celery的Worker服务

      $ celery -A task.celery worker --loglevel=info

    2. 发送任务并接收任务结果

      >>> from task import add
      >>> result = add.delay(3, 4)
      

      这里返回了一个 AsyncResult实例。

    3. 查看任务是否执行:

      >>> result.ready()
      True
      
    4. 等待之行结果:

      >>> result.get(timeout=1)
      7
      

      get()有很多参数:
      如果task在之行过程中抛出了异常,可以通过指定get参数让异常重新抛出,并进行跟踪:

      >>> result.get(propagate=True)
      >>> result.traceback
      …
      
    5. 更多信息参考celery.result

    获取任务结果
    配置

    Celery有输入和输出,可以将输入连接到消息中间件,将输出连接到结果后端。通畅情况下默认配置已经够用,不需要做过多的更动。

    现在可以先参考官方文档

    相关文章

      网友评论

        本文标题:Celery

        本文链接:https://www.haomeiwen.com/subject/btbdpttx.html