美文网首页
15分钟入门 celery

15分钟入门 celery

作者: 李琼羽 | 来源:发表于2017-12-04 00:15 被阅读0次

    Reference

    Celery 简介

    引用官方的一段介绍

    Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling. Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list .Celery is Open Source and licensed under the BSD License.

    大概意思就是Celery 是一个专注于实时处理的任务队列和任务调度的系统。而且 celery 使用起来十分简单、灵活,并且可应用于分布式系统当中。
    我们在现实工作中可能会碰到这么两种情况:在一次 web 请求中我们需要处理一个十分耗时的任务;在我们系统中需要定时处理一些任务。这两种都是 celery 能解决的主要问题。

    Celery 应用

    Application 应用

    在使用 Celery 之前,我们需要先实例化 Celery. 我们可以称之为Application。
    首先安装 Celery:

    sudo pip install celery
    

    Celery 默认使用 RabbitMQ 作为 broker, 所以我们可以安装一个RabbitMQ。
    而后我们可以实例化 Celery

    from celery import Celery
    app = Celery()
    

    而后我们便可以通过 app 来调用 Celery 的库。我们可以在实例化的时候加入更多的参数,用来定制这个 app, celery 是线程安全的,所以你可以在一个进程中使用多个 Celery 实例。

    Task 任务

    Task 是 Celery 应用的基石,我们所有要执行的任务都需要被修饰为一个 task,一个 task 会同时扮演 worker 和 beat 两个角色,worker 用来执行任务,beat 用来触发任务。

    处理实时任务队列

    现在设定我们有一个非常耗时的任务:

    # tasks.py
    def send_email(name):
        print("准备发送邮件.")
        time.sleep(5)
        print("发送邮件给了{name}".format(name=name))
    

    一般我们同步的系统中这么调用:

    send_email("李琼羽")
    

    那么我们的程序会在这里堵塞5s。
    我们可以使用 Celery:

    # tasks.py
    import time
    from celery import Celery
    
    app = Celery()
    @app.task
    def send_email(name):
        print("准备发送邮件.")
        time.sleep(5)
        print("发送邮件给了{name}".format(name=name))
    

    然后启动 Celery 的任务处理进程:

    celery -A tasks worker -l info
    

    tasks 为模块, worker 指示为启动消费者 -l info 表示 log, 实际使用中可以用 supervisor 来管理。
    然后我们来用 Celery 的方式调用:

    from tasks import send_email
    send_email.delay("李琼羽")
    

    处理定时任务

    在我们自己的业务系统中有一个每月15号给所以可以发送一条短信的任务,又或者我们需要每5分钟跑一次报表 sql,对于这种需求,Celery 也能很好的处理。
    使用过 crontab 对此应该会很熟悉

    # tasks.py
    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery()
    @app.task
    def hello(word):
        print("发送时间是: {word}".format(word=word))
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, cc.s('每10秒一次'))
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(minute='*/1', ),
            cc.s('每分钟一次'),
        )
        sender.add_periodic_task(
            crontab(minute=30, hour=12, day_of_month=15),
            cc.s('每月15日12点30分发送')
        )
    

    首先我们定义了需要执行的任务,然后定义了setup_periodic_tasks ,其中* add_periodic_task *为添加任务计划,其中还支持 crontab 语法。
    crontab 语法可以参考:

    # 每分钟执行一次
    c1 = crontab()
    
    # 每天凌晨十二点执行
    c2 = crontab(minute=0, hour=0)
    
    # 每十五分钟执行一次
    crontab(minute='*/15')
    
    # 每周日的每一分钟执行一次
    crontab(minute='*',hour='*', day_of_week='sun')
    
    # 每周三,五的三点,七点和二十二点没十分钟执行一次
    crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri')
    

    然后我们启动 Celery 的 beat,这相当于一个生产者.

    celery -A tasks beat -l info
    

    再启动一个消费者:

    celery -A tasks worker -l info
    

    而后就能在 worker 客户端看到记录了。

    在 Django 中使用 Celery

    在 Django 中使用 celery 也是很方便的。为了在 Django 中使用 Celery 我们需要在项目中先实例化 Celery Application。
    我们的 Django 项目:

    - example/
      - manage.py
      - example/
        - __init__.py
        - settings.py
        - urls.py
    

    我们在项目下创建 celery.py :

    # example/example/celery.py
    import os
    from celery import Celery
    import logging
    
    # 设置 django
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'example.settings')
    
    # 用项目名称实例化 Celery
    app = Celery('example')
    
    # 从 Django 配置中读取 Celery 配置
    app.config_from_object('django.conf:settings', namespace='CELERY')
    # 设置自动发现任务,这样可以从 Django 的各个 app 中发现 task
    app.autodiscover_tasks()
    

    然后我们在 Django 中导入:

    # example/example/__init__.py
    from __future__ import absolute_import, unicode_literals
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

    然后我们的 Django app 会是这样:

    - app1/
        - tasks.py
        - models.py
    - app2/
        - tasks.py
        - models.py
    

    注意, tasks.py 的文件名是固定的,如果不是这个 Celery 将无法找到注册的 task。
    然后我们可以注册任务了:

    # demoapp/tasks.py
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    

    最后, 启动消费者

    celery -A example worker -l info
    

    就可以在 Django 中使用啦

    相关文章

      网友评论

          本文标题:15分钟入门 celery

          本文链接:https://www.haomeiwen.com/subject/sipzbxtx.html