美文网首页Django
在 Django 中使用 Celery 来进行耗时操作

在 Django 中使用 Celery 来进行耗时操作

作者: 柴柴土 | 来源:发表于2017-03-25 17:52 被阅读4920次

    最近在实验室中做一个项目(心酸啊,本想安安静静的清清闲闲的毕业的,没想到。。。),要把实验室发过的论文中的一些算法集成到一个 Web 服务器上,用户可以上传数据,还可以在 Web 上看到计算结果的可视化图表。整个服务器的后台是使用 Django 框架来搭建的,这些算法需要处理一定量的数据,使用了 numpy,pandas,scipy 等数值计算库,每一组数据的处理有时候需要跑好几个小时。为了合理的调度这些算法,我们这里使用了 Celery

    1. Django 处理 Request 的基本流程

    Django 流程示意图

    上面的这一张是网络上的 Django 处理 request 的流程示意图。大致意思就是:

    浏览器发起 http 请求 ----> http handling(request 解析) ----> url 匹配(正则匹配找到对应的 View) ----> 在View中进行逻辑的处理与数据计算(包括调用 Model 类进行数据库的增删改查)----> 将数据推送到 template,返回对应的 template/response。

    对于一些简单的操作,可以放在 View 中处理。在View处理任务时用户处于等待状态,直到页面返回结果。但是对于一些复杂的操作,则在 View 中应该先返回 response,再在后台处理任务。用户无需等待。当任务处理完成时,我们可以再通过 Ajax 之类的方式告知用户。

    Celery 就是基于 Python 开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

    2. Celery

    Celery 的基本架构

    上图是 Celery 的基本架构,它采用典型的生产生--消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。

    3. 安装 Celery

    安装过程就是直接按照官网上的文档安装即可。我这里用的均是目前的最新稳定版。

    • macOS Sierra 10.12.3
    • Django 1.10
    • Celery 4.0.2

    在早前版本的 Celery 中,有一个专门供 Django 使用的 Celery 版本:django-celery。但是在现在 Celery 已经统一为一个版本,所以直接安装原生的 Celery 即可:

    pip install celery
    

    Celery 推荐使用 RabbitMQRedis,Amazon SQS,Zookeeper,这几个作为 broker,但是只有前两个支持在生产环境使用。下面的表格对比了几种 broker。

    Name Status Monitoring Remote Control
    RabbitMQ Stable Yes Yes
    Redis Stable Yes Yes
    Amazon SQS Stable No No
    Zookeeper Experimental No No

    我是使用 Redis 作为 broker 的。除了安装 redis 之外,还应该安装 redis 的 python 支持库。

    安装 Redis:

    brew install redis
    

    安装 redis 的 python 支持库:

    pip install redis
    

    输入 redis-server 来开启 redis。当你看见下面的图案时,就说明成功开启了 redis。redis 默认监听 6379 端口。开启之后可以用 ctrl+c 来退出。

    开启 redis

    4. 把 Celery 配置到 Django 上

    假设你有一个项目 proj:

    - proj/
      - proj/__init__.py
      - proj/settings.py
      - proj/urls.py
    - manage.py
    

    Celery 建议在 proj/proj/celery.py 上定义一个 Celery 的实例。

    文件 proj/proj/celery.py:

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    app = Celery('proj')
    
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    
    

    然后再在proj/proj/__init__.py做一些配置。

    文件 proj/proj/__init__.py:

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

    完成上面的步骤之后,可以在命令行输入:

    celery worker -A proj -l info
    

    正常情况下,应该会出现类似于下图的输出。

    开启 celery 并与 redis 连接

    ok,接下来,为了让 celery 中执行的任务的结果返回我们的 Django,我们还应该安装 django-celery-results

    pip install django-celery-results
    

    再在 proj/proj/settings.py: 中做如下的设置:

    文件proj/proj/settings.py:

    # Celery 设置
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_BACKEND = 'django-db'
    CELERY_TIMEZONE = 'Asia/Shanghai' 
    
    INSTALLED_APPS = [
        ...
        ...
        'django_celery_results'
    ]
    
    

    再 migrate 一下:

    migrate django_celery_results
    

    5. 加入一个耗时任务

    在你的 app 的目录下,新建一个 tasks.py 文件。在里面加入一个耗时的任务:

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    # 模拟一个耗时操作
    @shared_task
    def longtime_test():
       ...
      # 在这里进行一些耗时操作
       ...
    

    views.py 中,写成这样:

    def test_view(request):
        # do something
        longtime_test.delay()
        return render(request, 'template.html', {'data': data})
    

    这样之后,就会先返回 html 模版,再在后台计算数据了。

    相关文章

      网友评论

        本文标题:在 Django 中使用 Celery 来进行耗时操作

        本文链接:https://www.haomeiwen.com/subject/eujcottx.html