美文网首页Flask
通过celery_one避免Celery定时任务重复执行

通过celery_one避免Celery定时任务重复执行

作者: 吾星喵 | 来源:发表于2019-02-21 14:25 被阅读4次

    通过celery_one避免Celery定时任务重复执行

    在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。

    参考:https://blog.csdn.net/qq_41333582/article/details/83899884

    有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。

    Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

    @task(base=QueueOnce, once={'graceful': True})
    

    后面的 once 参数表示,在遇到重复方法时的处理方式,默认 gracefulFalse,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

    另外如果要手动设置任务的 key,可以指定 keys 参数

    @celery.task(base=QueueOnce, once={'keys': ['a']})
    def slow_add(a, b):
        sleep(30)
        return a + b
    

    解决步骤

    Celery One允许你将Celery任务排队,防止多次执行

    安装

    pip install -U celery_once
    

    要求,需要Celery4.0,老版本可能运行,但不是官方支持的。

    使用celery_oncetasks需要继承一个名为QueueOnce的抽象base tasks

    Once安装完成后,需要配置一些关于ONCE的选项在Celery配置中

    from celery import Celery
    from celery_once import QueueOnce
    from time import sleep
    
    celery = Celery('tasks', broker='amqp://guest@localhost//')
    
    # 一般之前的配置没有这个,需要添加上
    celery.conf.ONCE = {
      'backend': 'celery_once.backends.Redis',
      'settings': {
        'url': 'redis://localhost:6379/0',
        'default_timeout': 60 * 60
      }
    }
    
    # 在原本没有参数的里面加上base
    @celery.task(base=QueueOnce)
    def slow_task():
        sleep(30)
        return "Done!"
    

    要确定配置,需要取决于使用哪个backend进行锁定,查看Backends

    在后端,这将覆盖apply_asyncdelay。它不影响直接调用任务。

    在运行任务时,celery_once检查是否没有锁定(针对Redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发AlreadyQueued异常。

    example.delay(10)
    example.delay(10)
    Traceback (most recent call last):
        ..
    AlreadyQueued()
    
    result = example.apply_async(args=(10))
    result = example.apply_async(args=(10))
    Traceback (most recent call last):
        ..
    AlreadyQueued()
    

    graceful:如果在任务的选项中设置了once={'graceful': True},或者在运行时设置了apply_async,则任务可以返回None,而不是引发AlreadyQueued异常。

    from celery_once import AlreadyQueued
    # Either catch the exception,
    try:
        example.delay(10)
    except AlreadyQueued:
        pass
    # Or, handle it gracefully at run time.
    result = example.apply(args=(10), once={'graceful': True})
    # or by default.
    @celery.task(base=QueueOnce, once={'graceful': True})
    def slow_task():
        sleep(30)
        return "Done!"
    

    其他功能请访问:https://pypi.org/project/celery_once/

    相关文章

      网友评论

        本文标题:通过celery_one避免Celery定时任务重复执行

        本文链接:https://www.haomeiwen.com/subject/nmvryqtx.html