美文网首页爬虫技术干货Python文集
异步任务神器 Celery 简明笔记

异步任务神器 Celery 简明笔记

作者: 人世间 | 来源:发表于2015-09-20 22:09 被阅读44371次

    异步任务

    异步任务是web开发中一个很常见的方法。对于一些耗时耗资源的操作,往往从主应用中隔离,通过异步的方式执行。简而言之,做一个注册的功能,在用户使用邮箱注册成功之后,需要给该邮箱发送一封激活邮件。如果直接放在应用中,则调用发邮件的过程会遇到网络IO的阻塞,比好优雅的方式则是使用异步任务,应用在业务逻辑中触发一个异步任务。

    实现异步任务的工具有很多,其原理都是使用一个任务队列,比如使用redis生产消费模型或者发布订阅模式实现一个简单的消息队列

    除了redis,还可以使用另外一个神器---Celery。Celery是一个异步任务的调度工具。它是Python写的库,但是它实现的通讯协议也可以使用ruby,php,javascript等调用。异步任务除了消息队列的后台执行的方式,还是一种则是跟进时间的计划任务。下面将会介绍如何使用celery实现这两种需求。

    Celry broker 和 backend

    最早学习celery的时候,冒出了一个rabbitmq,又冒出一个redis。当时一头雾水。实际上这正是celery的设计奥妙。简单来说,rabbitmq是一个采用Erlang写的强大的消息队列工具。在celery中可以扮演broker的角色。那么什么是broker?

    broker是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,进行对于的程序执行。好吧,这个邮箱可以看成是一个消息队列。那么什么又是backend,通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。对于 brokers,官方推荐是rabbitmq和redis,至于backend,就是数据库啦。为了简单起见,我们都用redis。

    Getting Starting

    使用celery包含三个方面,其一是定义任务函数,其二是运行celery服务,最后是客户应用程序的调用。

    创建一个文件 tasks.py

    输入下列代码:

    from celery import Celery
    
    brokers = 'redis://127.0.0.1:6379/5'
    backend = 'redis://127.0.0.1:6379/6'
    
    
    app = Celery('tasks', broker=broker, backend=backend)
    
    @app.task
    def add(x, y):
        return x + y
    

    上述代码导入了celery,然后创建了celery实例app,实力话的过程中,指定了任务名tasks(和文件名一致),传入了broker和backend。然后创建了一个任务函数add

    下面就启动celery服务

    在当前命令行终端运行:

    celery -A tasks worker  --loglevel=info
    

    此时会看见一对输出。包括注册的任务啦。

    下面客户端程序如何调用呢?打开一个命令行,进入Python环境

    In [0]:from tasks import add
    In [1]: r = add.delay(2, 2)
    In [2]: add.delay(2, 2)
    Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>
    
    In [3]: r = add.delay(3, 3)
    
    In [4]: r.re
    r.ready   r.result  r.revoke
    
    In [4]: r.ready()
    Out[4]: True
    
    In [6]: r.result
    Out[6]: 6
    
    In [7]: r.get()
    Out[7]: 6
    

    在celery命令行可以看见celery执行的日志:

    [2015-09-20 21:37:06,086: INFO/MainProcess] Task proj.tasks.add[76beb980-0f55-4629-a4fb-4a1776428ea8] succeeded in 0.00089102005586s: 6
    

    打开 backend的redis,也可以看见celery执行的信息。

    现在时在python环境中调用的add函数,实际上通常在应用程序中调用这个方法。需要注意,如果把返回值赋值给一个变量,那么原来的应用程序也会被阻塞,需要等待异步任务返回的结果。因此,实际使用中,不需要把结果赋值。

    计划任务

    上述的使用是简单的配置,下面介绍一个更健壮的方式来使用celery。首先创建一个python包,celery服务,姑且命名为proj。目录文件如下:

    ☁  proj  tree
    .
    ├── __init__.py
    ├── celery.py           # 创建 celery 实例
    ├── config.py               # 配置文件
    └── tasks.py                # 任务函数
    

    首先是 celery.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from __future__ import absolute_import
    from celery import Celery
    
    app = Celery('proj', include=['proj.tasks'])
    
    app.config_from_object('proj.config')
    
    if __name__ == '__main__':
        app.start()
    

    这一次创建 app,并没有直接指定 broker 和 backend。而是在配置文件中。

    config.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from __future__ import absolute_import
    
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
    BROKER_URL = 'redis://127.0.0.1:6379/6'
    
    

    剩下的就是tasks.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from __future__ import absolute_import
    from proj.celery import app
    
    @app.task
    def add(x, y):
        return x + y
    

    使用方法也很简单,在proj的同一级目录执行celery:

    celery -A proj worker -l info
    

    现在使用任务也很简单,直接在客户端代码调用 proj.tasks 里的函数即可。

    Scheduler

    一种常见的需求是每隔一段时间执行一个任务。配置如下

    config.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from __future__ import absolute_import
    
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
    BROKER_URL = 'redis://127.0.0.1:6379/6'
    
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    from datetime import timedelta
    
    CELERYBEAT_SCHEDULE = {
        'add-every-30-seconds': {
             'task': 'proj.tasks.add',
             'schedule': timedelta(seconds=30),
             'args': (16, 16)
        },
    }
    

    注意配置文件需要指定时区。这段代码表示每隔30秒执行 add 函数。

    一旦使用了 scheduler, 启动 celery需要加上-B 参数

    celery -A proj worker -B -l info
    

    crontab

    计划任务当然也可以用crontab实现,celery也有crontab模式。修改 config.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from __future__ import absolute_import
    
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
    BROKER_URL = 'redis://127.0.0.1:6379/6'
    
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    from celery.schedules import crontab
    
    CELERYBEAT_SCHEDULE = {
        # Executes every Monday morning at 7:30 A.M
        'add-every-monday-morning': {
            'task': 'tasks.add',
            'schedule': crontab(hour=7, minute=30, day_of_week=1),
            'args': (16, 16),
        },
    }
    

    总而言之,scheduler的切分度更细,可以精确到秒。crontab模式就不用说了。当然celery还有更高级的用法,比如多个机器使用,启用多个worker并发处理等。

    相关文章

      网友评论

      • 人生ku短:为什么不标注环境和版本。。。
      • fitchen:介绍的不清楚
      • Eureka912:难得思路这么清晰的教程. 循序渐进
      • 卿魂:brokers 还是broker?:grin:
        摆渡_猫:应该是brokers上面声明的这个吧
      • 四号公园_2016:请教一下,celery种正在跑的任务如何停掉或者删除?
        Agony_c8b7:revoke
      • 找自己的idea:还想请教一下 我有一个task 定义了一个函数 在函数中启动了一个线程 线程中有一个无限循环 当执行task后 线程无法结束 (如果不用celery的话 是可以结束这个线程的 我写了一个类继承了threading.Thread)有什么方法可以解决么 最好在函数中结束线程 或者 直接重启task?
      • 找自己的idea:如果想利用异步任务的结果 如何操作
        找自己的idea:@人世间 感谢回复
        人世间:获取结果可以使用 celery的result方法,相关文档 http://docs.celeryproject.org/en/latest/reference/celery.result.html。另外,celery会在backend存储一下任务的结果信息,可以从这个数据库中读取。另外flower http://flower.readthedocs.io/en/latest/ 这个项目对celery 提供了一个webui,也可以看到一下任务的信息。
      • 47a6d15542bb:写得不错,就是错别字太多了,建议校对一遍

      本文标题:异步任务神器 Celery 简明笔记

      本文链接:https://www.haomeiwen.com/subject/nkqicttx.html