美文网首页Django
Celery-分布式任务队列学习笔记

Celery-分布式任务队列学习笔记

作者: EarthChen | 来源:发表于2017-08-20 23:57 被阅读167次

    Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
    它是一个专注于实时处理的任务队列,同时也支持任务调度。
    以上是celery自己官网的介绍

    celery的应用场景很广泛

    • 处理异步任务
    • 任务调度
    • 处理定时任务
    • 分布式调度

    好处也很多,尤其在使用python构建的应用系统中,无缝衔接,使用相当方便。

    Celery

    安装

    安装Celery

    推荐使用pip安装,如果你使用的是虚拟环境,请在虚拟环境里安装

    $ pip install celery
    

    安装消息中间件

    Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件

    你希望用什么中间件和后端就请自行安装,一般都使用redis或者RabbitMQ

    安装Redis

    在Ubuntu系统下使用apt-get命令就可以

    $ sudo apt-get install redis-server
    

    如果你使用redis作为中间件,还需要安装redis支持包,同样使用pip安装即可

    $ pip install redis
    

    能出现以下结果即为成功

    redis 127.0.0.1:6379>
    

    其他的redis知识这里不左介绍,如果有兴趣,可以自行了解

    如果你使用RabbitMQ,也请安装RabbitMQ

    安装RabbitMQ

    $ sudo apt-get install rabbitmq-server
    

    使用Celery

    简单直接使用

    可以在需要的地方直接引入Celery,直接使用即可。最简单的方式只需要配置一个任务和中间人即可

    from celery import Celery
    
    app = Celery('tasks', broker='redis://localhost:6379/3')
    
    @app.task
    def add(x, y):
        return x + y
    

    我这里使用了redis作为中间件,这是可以按自己的习惯替换的

    由于默认的配置不是最切合我们的项目实际需要,一般来说我们都需要按我们自己的要求配置一些,
    但是由于需要将项目解耦,也好维护,我们最好使用单独的一个文件编写配置。

    单独配置配置文件

    比上面的稍微复杂一点,我们需要创建两个文件,一个为config.py的celery配置文件,在其中填写适合我们项目的配置,在创建一个tasks.py文件来编写我们的任务。文件的名字可以按你的喜好自己命名。

    config.py内容为:

    # coding=utf-8
    # 配置文件同一配置celery
    BROKER_URL = 'redis://localhost:6379/3'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'
    
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = True
    
    # 把“脏活”路由到专用的队列:
    CELERY_ROUTES = {
        'tasks.add': 'low-priority',
    }
    
    # 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
    CELERY_ANNOTATIONS = {
        'tasks.add': {'rate_limit': '10/m'}
    }
    

    配置好以后可以用以下命令检查配置文件是否正确(config为配置文件名)

    $ python -m config
    

    tasks.py内容为:

    # coding=utf-8
    from celery import Celery
    
    app = Celery()
    # 参数为配置文件的文件名
    app.config_from_object('config')
    
    @app.task
    def add(x, y):
        return x + y
    

    还有一种同一设置配置的方式,不是很推荐

    app.conf.update(
        task_serializer='json',
        accept_content=['json'],  # Ignore other content
        result_serializer='json',
        timezone='Europe/Oslo',
        enable_utc=True,
    )
    

    在app使用前先需要用以上方法批量更新配置文件。

    在应用上使用

    工程目录结构为

    proj/
        __init__.py
        # 存放配置和启动celery代码
        celery.py
        # 存放任务
        tasks.py
    

    celery.py为:

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery('proj',
                 broker='redis://localhost:6379/3',
                 backend='redis://localhost:6379/4',
                 include=['proj.tasks'])
    
    # Optional configuration, see the application user guide.
    app.conf.update(
        result_expires=3600,
    )
    
    if __name__ == '__main__':
        app.start()
    

    tasks.py为:

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    
    @app.task
    def add(x, y):
        return x + y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    

    启动celery只需要在proj同级目录下:

    $ celery -A proj worker -l info
    

    在django中使用celery

    我们的django的项目的目录结构一般如下

    proj/
        manage.py
        myapp/
        proj/
            __init__py
            settings.py
            urls.py
            wsgi.py
    

    想要在django项目中使用celery,我们首先需要在django中配置celery

    我们需要在与工程名同名的子文件夹中添加celery.py文件
    在本例中也就是proj/proj/celery.py

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    # 第二个参数为工程名.settings
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    # 括号里的参数为工程名
    app = Celery('proj')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    # 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    然后我们需要在同级目录下的init.py文件中配置如下内容
    proj/proj/init.py

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

    然后我们就可以把需要的任务放到需要的app下的tasks.py中,现在项目目录结构如下

    proj/
        manage.py
        myapp1/
            __init__.py
            tasks.py
            views.py
            model.py
            tests.py
        myapp2/
            __init__.py
            tasks.py
            views.py
            model.py
            tests.py
        proj/
            __init__py
            settings.py
            urls.py
            wsgi.py
    

    可能的一个tasks.py文件内容如下:
    myapp1/tasks.py为:

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    import time
    
    
    @shared_task
    def add(x, y):
        # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
        time.sleep(5)
        print(x+y)
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    

    @shared_task修饰器可以让你创建task不需要app实体

    在需要的地方调用相关任务即可,例如在myapp1/views.py中调用

    from django.shortcuts import render
    from .tasks import add
    
    
    def index(request):
        # 测试celery任务
        add.delay(4,5)
        return render(request,'index.html')
    

    然后就可以启动项目,celery需要单独启动,所以需要开两个终端,分别

    启动web应用服务器

    $ python manage.py runserver
    

    启动celery

    $ celery -A proj worker -l info
    

    然后访问浏览器就可以在启动celery的终端中看到输出


    测试结果

    扩展

    • 如果你的项目需要在admin中管理调度,请使用django-celery-beat
    1. 使用pip安装django-celery-beat
    $ pip install django-celery-beat
    

    不要在使用django-celery,这个项目已经停止更新好好多年。。。。

    1. 在settings.py中添加这个app
    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )
    
    1. 同步一下数据库
    $ python manage.py migrate
    
    1. 设置celery beat服务使用django_celery_beat.schedulers:DatabaseScheduler scheduler
    $ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    

    然后在就可以admin界面看到了。

    • 如果你想使用Django-ORM或者Django Cache作为后端,需要安装django-celery-results扩展(笔者不建议)
    1. 使用pip安装django-celery-results
    $ pip install django-celery-results
    

    不要在使用django-celery,这个项目已经停止更新好好多年。。。。

    1. 在settings.py中添加这个app
    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    
    1. 同步一下数据库
    $ python manage.py migrate django_celery_results
    
    1. 配置后端,在settings.py中配置
    # 使用数据库作为结果后端
    CELERY_RESULT_BACKEND = 'django-db'
    
    # 使用缓存作为结果后端
    CELERY_RESULT_BACKEND = 'django-cache'
    

    基本使用大概就是上述这些,其他具体配置和使用还需自己研读官方文档

    注:

    • 上述环境在ubuntu16.04 lts django1.9中搭建测试成功
    • 上述文字皆为个人看法,如有错误或建议请及时联系我

    相关文章

      网友评论

        本文标题:Celery-分布式任务队列学习笔记

        本文链接:https://www.haomeiwen.com/subject/lkbqdxtx.html