美文网首页
Python 通过 Celery 框架实现分布式任务队列

Python 通过 Celery 框架实现分布式任务队列

作者: rollingstarky | 来源:发表于2019-11-06 19:25 被阅读0次

    Celery 是一个简单、灵活且可靠的分布式消息处理系统,主要用来作为任务队列对海量消息数据进行实时的处理,在多个程序线程或者主机之间传递和分发工作任务。同时也支持计划任务等需求。

    一、环境配置

    Celery 框架自身并不对传入的消息进行存储,因此在使用前需要先安装第三方的 Message Broker。如 RabbitMQRedis 等。

    安装 RabbitMQ

    对于 Linux 系统,执行以下命令:

    $ sudo apt-get install rabbitmq-server    # 安装 RabbitMQ
    $ sudo rabbitmqctl add_user myuser mypassword    # 添加用户 myuser/mypassword
    $ sudo rabbitmqctl add_vhost myvhost    # 添加 vhost
    $ sudo rabbitmqctl set_user_tags myuser mytag
    $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"    # 为用户 myuser 设置访问 myvhost 的权限
    

    通过 Docker 安装的步骤如下:

    $ docker pull rabbitmq:3.8-management    # 拉取 docker 镜像(包含 web 管理)
    # 启动 rabbitmq 容器
    $ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit \
    -e RABBITMQ_DEFAULT_VHOST=myvhost \
    -e RABBITMQ_DEFAULT_USER=myuser \
    -e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3.8-management
    
    安装 Redis

    $ sudo apt-get install redis-server

    安装 Celery

    $ pip install celery

    二、创建 Celery 应用

    Celery 应用是该框架所能提供的所有功能(如管理 tasks 和 workers 等)的入口,须确保它可以被其他模块导入。
    以下是一段简单的 Celery app 代码 tasks.py

    # tasks.py
    from celery import Celery
    
    app = Celery('tasks',
                 broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
                 backend='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y
    

    使用 RabbitMQ 作为 broker 接收和发送任务消息,使用 Redis 作为 backend 存储计算结果。

    运行 Celery worker 服务

    $ celery -A tasks worker --loglevel=info

    $ celery -A tasks worker --loglevel=info
    
     -------------- celery@skitarniu-ubuntu18 v4.3.0 (rhubarb)
    ---- **** -----
    --- * ***  * -- Linux-4.15.0-60-generic-x86_64-with-debian-buster-sid 2019-11-01 07:21:34
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         tasks:0x7f4f30b84a90
    - ** ---------- .> transport:   amqp://myuser:**@localhost:5672/myvhost
    - ** ---------- .> results:     redis://localhost:6379/0
    - *** --- * --- .> concurrency: 2 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    
    
    [tasks]
      . tasks.add
    
    [2019-11-01 07:21:35,316: INFO/MainProcess] Connected to amqp://myuser:**@127.0.0.1:5672/myvhost
    [2019-11-01 07:21:35,367: INFO/MainProcess] mingle: searching for neighbors
    [2019-11-01 07:21:36,535: INFO/MainProcess] mingle: all alone
    [2019-11-01 07:21:36,782: INFO/MainProcess] celery@skitarniu-ubuntu18 ready.
    
    任务测试

    进入 Python Shell,执行以下命令发布任务并获取结果:

    >>> from tasks import add
    >>> result = add.delay(4, 4)
    >>> result
    <AsyncResult: 6f435bc7-f194-469c-837f-54d77f880ace>
    >>> result.ready()
    True
    >>> result.get()
    8
    >>> result.traceback
    >>>
    

    delay() 方法用于发布任务消息,它是 apply_async() 方法的简写,即以异步的方式将任务需求提交给前面启动好的 worker 去处理。delay() 方法返回一个 AsyncResult 对象。
    result.ready() 方法可以用来检查提交的任务是否已经完成,返回布尔值。

    result.get() 方法则用于获取执行完成后的结果。如任务未完成,则程序会一直等待直到有结果返回。因此该方法是阻塞的,并不常用。可以传入 timeout 参数指定等待的时间上限。
    result.get(timeout=1),尝试获取任务执行后的结果,等待 1 秒。若 1 秒之后结果仍未返回,抛出 celery.exceptions.TimeoutError: The operation timed out. 异常。

    如果任务执行过程中有抛出异常,则使用 get() 方法获取结果时会重新抛出该异常导致程序中断。可以通过修改 propagate 参数避免此情况:
    result.get(propagate=False)
    result.traceback 则用于获取任务的 traceback 信息。

    三、Calling Tasks

    Celery 定义了一些可供 task 实例调用的通用的 Calling API,包括三个方法和一些标准的执行选项:

    • apply_async(args[, kwargs[, ...]]):发送任务消息给 worker
    • delay(*args, **kwargs):发送任务消息的简写形式,不支持执行选项
    • calling (__call__):即在本地进程中直接执行任务函数,不通过 worker 异步执行

    以下是一些常见的调用示例:

    • T.delay(arg, kwarg=value)
    • T.apply_async((arg,), {'kwarg': value})
    • T.apply_async(countdown=10)
      10 秒之后开始执行某个任务
    • T.apply_async(eta=now + timedelta(seconds=10))
      10 秒之后开始执行某个任务
    • T.apply_async(countdown=60, expires=120)
      预计 1 分钟后开始执行,但 2 分钟后还未执行则失效
    • T.apply_async(expires=now + timedelta(days=2))
      2 天后失效

    通过 countdown 设置任务的延迟执行:

    >>> from tasks import add
    >>> result = add.apply_async((2, 3))
    >>> result.get()
    5
    >>> delay_result = add.apply_async((2, 3), countdown=15)
    >>> delay_result.ready()
    False
    >>> delay_result.ready()
    False
    >>> delay_result.ready()
    False
    >>> delay_result.ready()
    True
    >>> delay_result.get()
    5
    

    还可以通过 eta(estimated time of arrival) 设置延迟执行的时间:

    >>> from datetime import datetime, timedelta
    >>> tomorrow = datetime.utcnow() + timedelta(days=1)
    >>> add.apply_async((2, 3), eta=tomorrow)
    <AsyncResult: c7dc6d7f-8b87-49d1-8077-73d7f046d709>
    

    此时 worker 在命令行的日志输出如下:

    [2019-11-06 05:16:21,362: INFO/MainProcess] Received task: tasks.add[c7dc6d7f-8b87-49d1-8077-73d7f046d709]
    ETA:[2019-11-07 05:16:06.652736+00:00]
    

    四、计划任务

    Celery 允许像使用 crontab 那样按计划地定时执行某个任务。参考代码如下:

    # tasks.py
    from celery import Celery
    
    app = Celery('tasks',
                 broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
                 backend='redis://localhost:6379/1')
    
    app.conf.beat_schedule = {
        'add-every-60-seconds': {
            'task': 'tasks.add',
            'schedule': 60.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'
    
    @app.task
    def add(x, y):
        print(x + y)
    

    运行 celery -A tasks worker -B 启动 worker 服务。
    -B 选项表示 beat,即 celery beat 服务,负责执行计划任务。

    输出如下(每隔一分钟执行一次):

    $ celery -A tasks worker -B
    ...
    [2019-11-06 05:41:34,057: WARNING/ForkPoolWorker-3] 32
    [2019-11-06 05:42:33,998: WARNING/ForkPoolWorker-3] 32
    [2019-11-06 05:43:34,056: WARNING/ForkPoolWorker-3] 32
    [2019-11-06 05:44:34,105: WARNING/ForkPoolWorker-3] 32
    [2019-11-06 05:45:34,157: WARNING/ForkPoolWorker-3] 32
    ...
    

    同时 Celery 也支持更复杂的 crontab 类型的时间规划:

    from celery.schedules import crontab
    
    app.conf.beat_schedule = {
        # Executes every Monday morning at 7:30 a.m.
        'add-every-monday-morning': {
            'task': 'tasks.add',
            'schedule': crontab(hour=7, minute=30, day_of_week=1),
            'args': (16, 16),
        },
    }
    

    Crontab 表达式支持的语法如下:

    Example Meaning
    crontab() 每分钟执行一次
    crontab(minute=0, hour=0) 每天半夜 0 点执行
    crontab(minute=0, hour='*/3') 每隔 3 小时执行一次(从 0 时开始)
    crontab(minute=0, hour='0,3,6,9,12,15,18,21') 同上一条
    crontab(day_of_week='sunday') 只在周日执行,每隔一分钟执行一次
    crontab(minute='*', hour='*', day_of_week='sun') 同上一条
    crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') 只在周四、周五的 3、17、22 时执行,每隔 10 分钟执行一次
    crontab(minute=0, hour='*/2,*/3') 只在能被 2 或者 3 整除的整点执行
    crontab(minute=0, hour='*/3,8-17') 在能被 3 整除的整点,和 8-17 点之间的整点执行
    crontab(0, 0, day_of_month='2') 在每个月的第二天的 0 时执行
    crontab(0, 0, day_of_month='11', month_of_year='5') 在每年的 5 月 11 号 0 点执行

    参考资料

    Celery 4.3.0 documentation

    相关文章

      网友评论

          本文标题:Python 通过 Celery 框架实现分布式任务队列

          本文链接:https://www.haomeiwen.com/subject/xpbfbctx.html