美文网首页
Django 2.1.7 Celery 4.3.0 Routin

Django 2.1.7 Celery 4.3.0 Routin

作者: Devops海洋的渔夫 | 来源:发表于2019-08-04 01:55 被阅读0次

    原文链接:Django 2.1.7 Celery 4.3.0 Routing 任务队列路由功能

    相关篇章:


    签名的篇章基本说明了任务签名、任务调用、任务执行流程等等,下面来看看路由设置。

    回顾celery模块的文档结构

    需求场景

    假如我们有两个worker,一个worker专门用来处理邮件发送任务和图像处理任务,一个worker专门用来处理文件上传任务。

    我们创建两个队列,一个专门用于存储邮件任务队列和图像处理,一个用来存储文件上传任务队列。

    Celery支持AMQP(Advanced Message Queue)所有的路由功能,我们也可以使用简单的路由设置将指定的任务发送到指定的队列中.

    路由配置示例

    通过 celeryconfig.py 配置任务路由,来控制任务队列划分

    我们需要配置在celeryconfig.py模块中配置 CELERY_ROUTES 项, tasks.py模块修改如下:

    from celery_tasks.celery import app as celery_app
    
    @celery_app.task
    def my_task1(a, b):
        print("my_task1任务正在执行....")
        return a + b
    
    
    @celery_app.task
    def my_task2(a, b):
        print("my_task2任务正在执行....")
        return a + b
    
    
    @celery_app.task
    def my_task3(a, b):
        print("my_task3任务正在执行....")
        return a + b
    
    
    @celery_app.task
    def my_task4(a, b):
        print("my_task3任务正在执行....")
        return a + b
    
    
    @celery_app.task
    def my_task5():
        print("my_task5任务正在执行....")
    
    
    @celery_app.task
    def my_task6():
        print("my_task6任务正在执行....")
    
    
    @celery_app.task
    def my_task7():
        print("my_task7任务正在执行....")
    

    通过配置,将send_email和upload_file任务发送到queue1队列中,将image_process发送到queue2队列中。

    修改celeryconfig.py:

    # 设置结果存储
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/9'
    
    # 设置代理人broker
    BROKER_URL = 'redis://127.0.0.1:6379/8'
    
    # 配置任务路由
    CELERY_ROUTES=({
        'celery_tasks.tasks.my_task5': {'queue': 'queue1'},
        'celery_tasks.tasks.my_task6': {'queue': 'queue1'},
        'celery_tasks.tasks.my_task7': {'queue': 'queue2'},
        },
    )
    

    下面来测试看看:

    In [1]: from celery_tasks.tasks import *
    
    In [2]: my_task5.delay()
    Out[2]: <AsyncResult: 16d2e1f7-745f-4f38-ab57-f18859a8ff8a>
    
    In [3]: my_task6.delay()
    Out[3]: <AsyncResult: 99b9f858-6bb8-47dc-b47f-d3b8b6b3f6bf>
    
    In [4]: my_task7.delay()
    Out[4]: <AsyncResult: f50c876b-f4c4-4a82-84cc-ad83c9314fa8>
    
    

    开启两个worker服务器,分别处理两个队列:

    celery -A celery_tasks worker -l info -P eventlet -Q queue1
    celery -A celery_tasks worker -l info -P eventlet -Q queue2
    

    启动了worker之后,任务立即按照队列进行处理,如下:

    • queue1 的 worker 执行日志如下:
    (venv) F:\pythonProject\django-pratice>celery -A celery_tasks worker -l info -P eventlet -Q queue1
    
     -------------- celery@USC2VG2F9NPB650 v4.3.0 (rhubarb)
    ---- **** -----
    --- * ***  * -- Windows-10-10.0.17763-SP0 2019-08-04 01:42:00
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         celery_tasks.tasks:0x155ee3036d8
    - ** ---------- .> transport:   redis://127.0.0.1:6379/8
    - ** ---------- .> results:     redis://127.0.0.1:6379/9
    - *** --- * --- .> concurrency: 12 (eventlet)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> queue1           exchange=queue1(direct) key=queue1
    
    
    [tasks]
      . celery_tasks.tasks.my_task1
      . celery_tasks.tasks.my_task2
      . celery_tasks.tasks.my_task3
      . celery_tasks.tasks.my_task4
      . celery_tasks.tasks.my_task5
      . celery_tasks.tasks.my_task6
      . celery_tasks.tasks.my_task7
    
    [2019-08-04 01:42:00,230: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8
    [2019-08-04 01:42:00,271: INFO/MainProcess] mingle: searching for neighbors
    [2019-08-04 01:42:01,422: INFO/MainProcess] mingle: all alone
    [2019-08-04 01:42:01,547: INFO/MainProcess] celery@USC2VG2F9NPB650 ready.
    [2019-08-04 01:42:01,551: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/8.
    [2019-08-04 01:42:01,567: INFO/MainProcess] Received task: celery_tasks.tasks.my_task5[16d2e1f7-745f-4f38-ab57-f18859a8ff8a]
    [2019-08-04 01:42:01,568: WARNING/MainProcess] my_task5任务正在执行....
    [2019-08-04 01:42:01,589: INFO/MainProcess] Received task: celery_tasks.tasks.my_task6[99b9f858-6bb8-47dc-b47f-d3b8b6b3f6bf]
    [2019-08-04 01:42:01,590: WARNING/MainProcess] my_task6任务正在执行....
    [2019-08-04 01:42:01,596: INFO/MainProcess] Task celery_tasks.tasks.my_task5[16d2e1f7-745f-4f38-ab57-f18859a8ff8a] succeeded in 0.030999999959021807s: None
    [2019-08-04 01:42:01,619: INFO/MainProcess] Task celery_tasks.tasks.my_task6[99b9f858-6bb8-47dc-b47f-d3b8b6b3f6bf] succeeded in 0.03100000007543713s: None
    
    • queue2 的 worker 执行日志如下:
    (venv) F:\pythonProject\django-pratice>celery -A celery_tasks worker -l info -P eventlet -Q queue2
    
     -------------- celery@USC2VG2F9NPB650 v4.3.0 (rhubarb)
    ---- **** -----
    --- * ***  * -- Windows-10-10.0.17763-SP0 2019-08-04 01:42:54
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         celery_tasks.tasks:0x17e72b83668
    - ** ---------- .> transport:   redis://127.0.0.1:6379/8
    - ** ---------- .> results:     redis://127.0.0.1:6379/9
    - *** --- * --- .> concurrency: 12 (eventlet)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> queue2           exchange=queue2(direct) key=queue2
    
    
    [tasks]
      . celery_tasks.tasks.my_task1
      . celery_tasks.tasks.my_task2
      . celery_tasks.tasks.my_task3
      . celery_tasks.tasks.my_task4
      . celery_tasks.tasks.my_task5
      . celery_tasks.tasks.my_task6
      . celery_tasks.tasks.my_task7
    
    [2019-08-04 01:42:54,990: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8
    [2019-08-04 01:42:55,025: INFO/MainProcess] mingle: searching for neighbors
    [2019-08-04 01:42:56,197: INFO/MainProcess] mingle: all alone
    [2019-08-04 01:42:56,260: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/8.
    [2019-08-04 01:42:56,323: INFO/MainProcess] celery@USC2VG2F9NPB650 ready.
    [2019-08-04 01:42:56,484: INFO/MainProcess] Received task: celery_tasks.tasks.my_task7[f50c876b-f4c4-4a82-84cc-ad83c9314fa8]
    [2019-08-04 01:42:56,487: WARNING/MainProcess] my_task7任务正在执行....
    [2019-08-04 01:42:56,520: INFO/MainProcess] Task celery_tasks.tasks.my_task7[f50c876b-f4c4-4a82-84cc-ad83c9314fa8] succeeded in 0.030999999959021807s: None
    

    可以从上面两个日志看出,queue1的worker执行了tast5和tast6,queue2的worker执行了task7

    这是前面在路由配置好的,那么如果没有配置好的任务,能否动态直接调用到不同的queue队列中呢?

    通过apply_aynsc()方法动态划分任务至队列中

    可以通过apply_aynsc()方法来设置任务发送到那个队列中

    In [6]: my_task1.apply_async(args=(10,20),queue='queue1')
    Out[6]: <AsyncResult: b3eb2fe8-791a-40fd-bcd5-d538afedbf81>
    

    查看queue1的worker日志,如下:

    可以看到queue1的worker已经执行了my_task1的任务了。

    双/多队列worker

    我们也可设置一个worker服务器处理两个队列中的任务:

    celery -A celery_tasks worker -l info -P eventlet -Q queue1,queue2
    

    启动日志如下:

    (venv) F:\pythonProject\django-pratice>celery -A celery_tasks worker -l info -P eventlet -Q queue1,queue2
    
     -------------- celery@USC2VG2F9NPB650 v4.3.0 (rhubarb)
    ---- **** -----
    --- * ***  * -- Windows-10-10.0.17763-SP0 2019-08-04 01:49:45
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         celery_tasks.tasks:0x1d0bb4227b8
    - ** ---------- .> transport:   redis://127.0.0.1:6379/8
    - ** ---------- .> results:     redis://127.0.0.1:6379/9
    - *** --- * --- .> concurrency: 12 (eventlet)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> queue1           exchange=queue1(direct) key=queue1
                    .> queue2           exchange=queue2(direct) key=queue2
    
    [tasks]
      . celery_tasks.tasks.my_task1
      . celery_tasks.tasks.my_task2
      . celery_tasks.tasks.my_task3
      . celery_tasks.tasks.my_task4
      . celery_tasks.tasks.my_task5
      . celery_tasks.tasks.my_task6
      . celery_tasks.tasks.my_task7
    
    [2019-08-04 01:49:45,805: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8
    [2019-08-04 01:49:45,838: INFO/MainProcess] mingle: searching for neighbors
    [2019-08-04 01:49:47,054: WARNING/MainProcess] g:\python3\python371\lib\site-packages\celery\app\control.py:54: DuplicateNodenameWarning: Received multiple replies from node name:
    celery@USC2VG2F9NPB650.
    Please make sure you give each node a unique nodename using
    the celery worker `-n` option.
      pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
    [2019-08-04 01:49:47,055: INFO/MainProcess] mingle: all alone
    [2019-08-04 01:49:47,196: INFO/MainProcess] celery@USC2VG2F9NPB650 ready.
    [2019-08-04 01:49:47,229: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/8.
    

    现在让my_task1在queue1和queue2都调用一下:

    In [7]: my_task1.apply_async(args=(10,20),queue='queue1')
    Out[7]: <AsyncResult: 3e07792e-6d82-44e2-8825-6a9cdb0034f5>
    
    In [8]: my_task1.apply_async(args=(10,20),queue='queue2')
    Out[8]: <AsyncResult: 757f62b6-01c8-4f2d-a3f2-54956b962a0c>
    

    查看双队列worker的运行日志,如下:

    可以从日志中看出,两个队列的任务该worker都可以执行。

    相关文章

      网友评论

          本文标题:Django 2.1.7 Celery 4.3.0 Routin

          本文链接:https://www.haomeiwen.com/subject/iocedctx.html