美文网首页
Python中关于消息队列Celery的任务放重复机制处理

Python中关于消息队列Celery的任务放重复机制处理

作者: 小钟钟同学 | 来源:发表于2019-05-24 01:22 被阅读0次

    为了防止处理多个worker重复的消费的任务的问题,实践了一下关于celery_once的处理。

    环境:


    image.png

    ;
    win7+redis2.1.8+celery3.1.19+celery_once3.00

    项目:(PS如果celery的app实例不是放在子ini下面的哈,会莫名其妙报错!暂时未知)

    image.png

    前提需要期待redis-server

    1:编写celery实例:ini.py

    #!/usr/bin/evn python
    # coding=utf-8
    
    """
    Author = zyx
    @Create_Time: 2018/1/11 14:00
    @version: v1.0.0
    @Contact: 308711822@qq.com
    @File: __init__.py.py
    @文件功能描述:
    """
    
    
    import time
    from celery import Celery
    #
    broker = 'redis://127.0.0.1:6379/2'
    backend = 'redis://127.0.0.1:6379/0'
    #
    # app = Celery('my_task', broker=broker, backend=backend)
    # from celerytakls import task1
    # task1.init()
    #
    # @app.task
    # def add(x, y):
    #     task1.getV(1)
    #     time.sleep(5)     # 模拟耗时操作
    #     return x + y
    #
    # @app.task
    # def jianshao(x, y):
    #     task1.getV(1)
    #     time.sleep(5)     # 模拟耗时操作
    #     return x + y
    #
    # if __name__ == '__main__':
    #     app.start()
    
    # ===============
    from celery import Celery
    from celery_once import QueueOnce
    from time import sleep
    
    celery = Celery('my_task', broker=broker,backend=backend)
    
    # 一般之前的配置没有这个,需要添加上
    celery.conf.ONCE = {
      'backend': 'celery_once.backends.Redis',
      'settings': {
        'url': 'redis://localhost:6379/0',
        'default_timeout': 60 * 60
      }
    }
    
    # 在原本没有参数的里面加上base
    @celery.task(base=QueueOnce)
    def slow_task(x,y):
        sleep(30)
        return "Done!"
    

    2:然后启动worker
    PS:celery的app实例在celerytakls的下面 所有 -A后面的实例所在的位置

    (lin-cms-flask) D:\python_learn\lin-cms-flask>celery worker -A celerytakls --loglevel=info
    
    
    image.png

    3:编写生产者,消息的发生:xiaofeizhe.py

    #!/usr/bin/evn python
    # coding=utf-8
    
    import datetime
    from datetime import timedelta
    
    # from celerytakls import task1
    from celerytakls import slow_task
    
    # task1.add.apply_async(args=[2, 8])  # 也可用 task1.add.delay(2, 8)
    
    slow_task.apply_async(args=[1336, 8])  # 也可用 task1.add.delay(2, 8)
    
    

    4:启动多次的生产者是观察发现拨错!!


    image.png

    5:不同的任务参数,句可以多次提交

    image.png

    补充:

    @task(base=QueueOnce, once={'graceful': True})
    

    后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

    另外如果要手动设置任务的 key,可以指定 keys 参数
    
    @celery.task(base=QueueOnce, once={'keys': ['a']})
    def slow_add(a, b):
        sleep(30)
        return a + b
    
    修改 task 参数
    
    @celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
    def slow_add(a, b):
        sleep(30)
        return a + b
    

    相关文章

      网友评论

          本文标题:Python中关于消息队列Celery的任务放重复机制处理

          本文链接:https://www.haomeiwen.com/subject/geqwzqtx.html