美文网首页
22. Celery 4.x 动态添加定时任务

22. Celery 4.x 动态添加定时任务

作者: Devops海洋的渔夫 | 来源:发表于2020-05-16 10:02 被阅读0次

    需求

    为了能够在Web端口动态添加定时任务的需求,本次来调研一下Celery 4.x 在Django框架下该如何动态添加定时任务。

    Celery动态添加定时任务的官方文档

    celery文档:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    django-celery-beat文档 : https://pypi.org/project/django-celery-beat/

    新建Django项目

    安装最新版本的Django

    pip3 install django
    

    当前我安装的版本是 3.0.6

    创建项目

    django-admin startproject 项目名称

    执行如下:

    django-admin startproject django_con .
    

    安装 celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis
    

    安装完毕后,相关依赖版本

    amqp==2.5.2
    anyjson==0.3.3
    asgiref==3.2.7
    billiard==3.6.3.0
    celery==4.4.2
    cffi==1.14.0
    cryptography==2.9.2
    Django==3.0.6
    django-celery==3.3.1
    django-celery-beat==2.0.0
    django-celery-results==1.2.1
    django-timezone-field==4.0
    dnspython==1.16.0
    eventlet==0.25.2
    greenlet==0.4.15
    importlib-metadata==1.6.0
    kombu==4.6.8
    monotonic==1.5
    pycparser==2.20
    python-crontab==2.4.2
    python-dateutil==2.8.1
    pytz==2020.1
    redis==3.5.1
    six==1.14.0
    sqlparse==0.3.1
    vine==1.3.0
    zipp==3.1.0
    

    测试使用Celery应用

    创建Celery目录结构

    首先在Django项目中创建一个celery_tasks文件夹,再创建tasks.py模块, 如下:

    image-20200514161135291

    编写tasks.py 其内容为:

    from celery import Celery
    
    # 使用redis作为broker
    app = Celery('celery_tasks.tasks', broker='redis://192.168.196.135:6379/8')
    
    # 创建任务函数
    @app.task
    def my_task():
        print("任务函数正在执行....")
    

    Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。my_task函数是我们编写的一个任务函数, 通过加上装饰器app.task, 将其注册到broker的队列中。

    现在我们在创建一个worker, 等待处理队列中的任务。

    进入项目的根目录,执行命令: celery -A celery_tasks.tasks worker -l info

    image-20200514161616827

    调用任务

    下面来测试一下功能,创建一个任务,加入任务队列中,提供worker执行。

    进入python终端, 执行如下代码:

    [root@python_env django_cron]# python3 manage.py shell
    In [3]: from celery_tasks.tasks import my_task
    
    # 调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。
    In [4]: my_task.delay()
    Out[4]: <AsyncResult: 647b2589-95d2-45c9-a9a7-0b5530caf249>
    

    返回worker的终端界面,查看任务执行情况,如下:

    image-20200514163314267

    可以看到已经收到任务,并执行打印了信息。

    存储结果

    如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

    例子我们仍然使用Redis作为存储结果的方案,任务结果存储配置我们通过Celery的backend参数来设定。我们将tasks模块修改如下:

    from celery import Celery
    
    # 使用redis作为broker以及backend
    app = Celery('celery_tasks.tasks',
                 broker='redis://192.168.196.135:6379/8',
                 backend='redis://192.168.196.135:6379/9')
    
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b
    

    我给Celery增加了backend参数,指定redis作为结果存储,并将任务函数修改为两个参数,并且有返回值。

    下面再来执行调用一下这个任务看看。

    In [1]: from celery_tasks.tasks import my_task
    
    # 传递参数至任务中
    In [5]: ret = my_task.delay(10,20)
    
    # 查询返回值的结果
    In [6]: ret.result
    Out[6]: 30
    
    # 查看是否执行失败
    In [7]: ret.failed()
    Out[7]: False
    

    再来看看worker的执行情况,如下:

    image-20200514164236552

    可以看到celery任务已经执行成功了。

    但是这只是一个开始,下一步要看看如何添加定时的任务。

    优化Celery目录结构

    上面直接将Celery的应用创建、配置、tasks任务全部写在了一个文件,这样在后面项目越来越大,也是不方便的。下面来拆分一下,并且添加一些常用的参数。

    image-20200514165214739

    创建Celery应用的文件 celery.py

    from celery import Celery
    from celery_tasks import celeryconfig
    
    import os
    # 为celery设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
    
    ## 创建celery app
    app = Celery('celery_tasks')
    
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
    
    # 设置app自动加载任务
    app.autodiscover_tasks([
        'celery_tasks',
    ])
    

    配置Celery的参数文件 celeryconfig.py

    # 设置结果存储
    CELERY_RESULT_BACKEND = 'redis://192.168.196.135:6379/9'
    # 设置代理人broker
    BROKER_URL = 'redis://192.168.196.135:6379/8'
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
    

    tasks 任务文件 tasks.py

    from celery_tasks.celery import app
    
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print("任务函数正在执行....")
        return a + b + c
    

    下一步来开始安装使用定时任务。

    使用 django-celery-beat 动态添加定时任务

    celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    官网的配置说明

    https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    image-20200514180603493

    安装 django-celery-beat

    pip3 install django-celery-beat
    

    配置 django-celery-beat

    在项目的 settings 文件配置:

    # 安装应用 django_celery_beat
    INSTALLED_APPS = [
        'django_celery_beat', # 安装 django_celery_beat
        ...
    ]
    
    # Django设置时区
    LANGUAGE_CODE = 'zh-hans'  # 使用中国语言
    TIME_ZONE = 'Asia/Shanghai'  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。
    USE_TZ = False
    

    在 celerconfig.py 配置 django_celery_beat:

    from django.conf import settings
    
    import os
    # 为celery设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
    
    # celery beat配置
    # CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
    

    创建 django-celery-beat 相关表

    执行Django数据库迁移: python3 manage.py migrate

    image-20200514170553728

    配置Celery使用 django-celery-beat

    配置 celery.py

    from celery import Celery
    from celery_tasks import celeryconfig
    from django.utils import timezone
    
    import os
    # 为celery设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
    
    ## 创建celery app
    app = Celery('celery_tasks')
    
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
    
    # 设置app自动加载任务
    app.autodiscover_tasks([
        'celery_tasks',
    ])
    
    # 解决时区问题,定时任务启动就循环输出
    app.now = timezone.now
    

    配置 celeryconfig.py

    from django.conf import settings
    import os
    
    # 为celery设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
    
    # 设置结果存储
    CELERY_RESULT_BACKEND = 'redis://192.168.196.135:6379/9'
    # 设置代理人broker
    BROKER_URL = 'redis://192.168.196.135:6379/8'
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
    
    # celery beat配置
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
    

    编写任务 tasks.py

    from celery_tasks.celery import app
    
    # 创建任务函数
    @app.task
    def my_task1(a, b, c):
        print("任务1函数正在执行....")
        return a + b + c
    
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
    

    启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    celery -A celery_tasks worker -l info
    

    启动定时器触发 beat

    celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    

    创建定时任务说明

    创建定时任务可以查看 django_celery_beat 的官网说明:https://pypi.org/project/django-celery-beat/

    image-20200514180952462

    下面先来翻译看看官网的示例,然后再实际应用一下。

    官网示例说明

    创建基于间隔时间的周期性任务

    初始化周期间隔对象 interval 对象

    在创建一个基于间隔时间的周期性任务之前,首先需要创建一个 interval 对象,用于提供任务设置周期间隔:

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    
    # executes every 10 seconds.
    >>> schedule, created = IntervalSchedule.objects.get_or_create(
    ...     every=10,
    ...     period=IntervalSchedule.SECONDS,
    ... )
    

    可以看到上面固定间隔的时间是采用秒 period=IntervalSchedule.SECONDS,如果你还想要固定其他的时间单位,可以设置其他字段参数,如下:

    • IntervalSchedule.DAYS 固定间隔天数
    • IntervalSchedule.HOURS 固定间隔小时数
    • IntervalSchedule.MINUTES 固定间隔分钟数
    • IntervalSchedule.SECONDS 固定间隔秒数
    • IntervalSchedule.MICROSECONDS 固定间隔微秒

    注意:

    如果你有多个周期性任务都是间隔10秒,那么这些任务都应该设置同一个 interval 对象

    另外,可以如果不清楚有哪些固定的时间单位,可以这样查看,如下:

    In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule                                                                                                  
    In [2]: IntervalSchedule.PERIOD_CHOICES                                                                                                                                       
    Out[2]: 
    (('days', 'Days'),
     ('hours', 'Hours'),
     ('minutes', 'Minutes'),
     ('seconds', 'Seconds'),
     ('microseconds', 'Microseconds'))
    
    创建周期性间隔任务

    下面这种是无参数的创建方法:

    >>> PeriodicTask.objects.create(
    ...     interval=schedule,                  # we created this above.
    ...     name='Importing contacts',          # simply describes this periodic task.
    ...     task='proj.tasks.import_contacts',  # name of task.
    ... )
    

    带参数的创建方法,如下:

    >>> import json
    >>> from datetime import datetime, timedelta
    
    >>> PeriodicTask.objects.create(
    ...     interval=schedule,                  # we created this above.
    ...     name='Importing contacts',          # simply describes this periodic task.
    ...     task='proj.tasks.import_contacts',  # name of task.
    ...     args=json.dumps(['arg1', 'arg2']),
    ...     kwargs=json.dumps({
    ...        'be_careful': True,
    ...     }),
    ...     expires=datetime.utcnow() + timedelta(seconds=30)
    ... )
    

    创建基于 crontab 的周期性任务

    初始化 crontab 的调度对象

    上面是创建基于固定周期的调度对象,那么 crontab 就是类似 linux 中的 crontab 定时方式。

    crontab 调度对象有如下字段:minute, hour, day_of_week, day_of_monthmonth_of_year

    对应配置 30 * * * * 的 crontab 定时写法。

    >>> from django_celery_beat.models import CrontabSchedule, PeriodicTask
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ...     minute='30',
    ...     hour='*',
    ...     day_of_week='*',
    ...     day_of_month='*',
    ...     month_of_year='*',
    ...     timezone=pytz.timezone('Canada/Pacific')
    ... )
    

    要注意上面的这个时区设置,中国的时区应该设置为 timezone=pytz.timezone('Asia/Shanghai')

    创建基于 crontab 调度的定时任务

    创建任务的方式跟创建固定间隔时间的周期性任务基本一致,只不过将 interval=schedule 改为了 crontab=schedule,有参数的写法也是一致。

    >>> PeriodicTask.objects.create(
    ...     crontab=schedule,
    ...     name='Importing contacts',
    ...     task='proj.tasks.import_contacts',
    ... )
    

    暂时停止周期性任务

    >>> periodic_task.enabled = False
    >>> periodic_task.save()
    

    启动运行周期任务的示例

    执行周期性任务的前提是需要有 workers 去执行,那么首先需要已经安装好了 Celery,上面我们已经安装好了。也就是跟我前面说的,celery的 workers 和 beat 定时服务都需要同时开启。

    1. 开启 celery 的 worker 服务

      $ celery -A [project-name] worker --loglevel=info

    2. 作为一个单独的进程,启动beat服务

       $ celery -A [project-name] beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
      

      或者你也可以使用 -S (scheduler flag) 标识符,更多参数说明查看 celery beat --help

      $ celery -A [project-name] beat -l info -S django
      

      另外,作为替代方案,你也可以只使用一个命令运行上面的两个步骤(worker和beat服务)(建议只用于开发环境

      $ celery -A [project-name] worker --beat --scheduler django --loglevel=info
      
    3. 现在启动完这两个服务,就可以开始添加周期性任务了。

    具体操作演练

    看完了上面官网的说明,下面拿我前面写好的两个task任务来创建一下周期性任务。

    创建基于间隔时间的周期性任务

    初始化周期间隔对象 interval 对象

    In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule                        
    
    # 创建一个间隔10秒钟的 interval 对象
    In [2]: schedule, created = IntervalSchedule.objects.get_or_create( 
       ...:     every=10, 
       ...:     period=IntervalSchedule.SECONDS, 
       ...: ) 
    
    # 查询已创建的 interval 对象
    In [7]: IntervalSchedule.objects.all()                                                               Out[7]: <QuerySet [<IntervalSchedule: every 10 seconds>]>
    

    创建周期性间隔任务

    创建一个无参数的周期性间隔任务:
    In [3]: PeriodicTask.objects.create( 
       ...:     interval=schedule,  # 上面创建10秒的间隔 interval 对象
       ...:     name='my_task2',    # 设置任务的name值
       ...:     task='celery_tasks.tasks.my_task2',  # 指定需要周期性执行的任务
       ...: )                                                                                          
    Out[3]: <PeriodicTask: my_task1: every 10 seconds>
    

    创建之后,beat服务日志显示如下:

    [2020-05-15 10:23:00,176: INFO/MainProcess] Scheduler: Sending due task my_task2 (celery_tasks.tasks.my_task2)
    [2020-05-15 10:23:10,180: INFO/MainProcess] Scheduler: Sending due task my_task2 (celery_tasks.tasks.my_task2)
    

    worker服务日志显示如下:

    [2020-05-15 10:22:50,191: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd]  
    [2020-05-15 10:22:50,193: WARNING/ForkPoolWorker-1] 任务2函数正在执行....
    [2020-05-15 10:22:50,196: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd] succeeded in 0.003136722996714525s: None
    [2020-05-15 10:23:00,191: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9]  
    [2020-05-15 10:23:00,194: WARNING/ForkPoolWorker-1] 任务2函数正在执行....
    [2020-05-15 10:23:00,197: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9] succeeded in 0.0030223939975257963s: None
    [2020-05-15 10:23:10,194: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[4b87f2f2-30d0-4c20-b028-601f28cb8193]  
    [2020-05-15 10:23:10,196: WARNING/ForkPoolWorker-1] 任务2函数正在执行....
    
    创建一个带参数的周期性间隔任务:
    In [27]: import json                                                                               
    
    In [28]: from datetime import datetime, timedelta                                                  
    
    In [29]: PeriodicTask.objects.create(  
        ...:     interval=schedule,  # 设置使用上面创建的 10 秒间隔 interval 对象
        ...:     name='my_task1',    # 设置周期性任务的名称
        ...:     task='celery_tasks.tasks.my_task1',  # 设置指定执行的task
        ...:     args=json.dumps([10, 20, 30]),  # 传递task需要的参数
        ...:     expires=datetime.now() + timedelta(seconds=30) # 任务的执行超时时间,避免任务执行时间过长
        ...: )                                                                                                                                                                    
    Out[29]: <PeriodicTask: my_task1: every 10 seconds>
    

    查看beat服务的日志:

    image-20200515102958056

    查看worker服务的日志:

    image-20200515103033364

    周期性任务在worker是否串行执行还是并行?

    这里有个疑问,如果只有一个worker,其中一个task执行时间比较长,例如:上面的两个任务都设置休眠10秒,确认是否可以同时执行,还是要开启多个worker执行。

    设置 task 任务进行休眠
    from celery_tasks.celery import app
    import time
    
    # 创建任务函数
    @app.task
    def my_task1(a, b, c):
        print("任务1函数正在执行....")
        print("任务1函数休眠10秒...")
        time.sleep(10)
        return a + b + c
    
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print("任务2函数休眠10秒....")
        time.sleep(10)
    
    删除这两个周期性任务,然后再创建后查看beat服务以及worker服务日志

    删除之前的两个周期性任务:

    # 暂停执行两个周期性任务
    In [32]: PeriodicTask.objects.get(name="my_task1").enabled = False                                   In [33]: PeriodicTask.objects.get(name="my_task1").save()                                             
    In [34]: PeriodicTask.objects.get(name="my_task2").enabled = False                                   In [35]: PeriodicTask.objects.get(name="my_task2").save()                                                                                                                     
    # 删除任务
    In [36]: PeriodicTask.objects.get(name="my_task1").delete()                                           Out[36]: (1, {'django_celery_beat.PeriodicTask': 1})
    In [37]: PeriodicTask.objects.get(name="my_task2").delete()                                           Out[37]: (1, {'django_celery_beat.PeriodicTask': 1})
    

    重启beat服务、worker服务:

    因为修改了 task,需要重启服务才能重新加载。

    # 启动beat进程
    celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    
    # 启动一个worker进程
    celery -A celery_tasks worker -l info
    

    重新创建两个周期性任务:

    In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule                        
    
    # 创建一个间隔10秒钟的 interval 对象
    In [2]: schedule, created = IntervalSchedule.objects.get_or_create( 
       ...:     every=10, 
       ...:     period=IntervalSchedule.SECONDS, 
       ...: )      
    
    # 创建无参数周期性任务
    In [3]: PeriodicTask.objects.create( 
       ...:     interval=schedule,  # 上面创建10秒的间隔 interval 对象
       ...:     name='my_task2',    # 设置任务的name值
       ...:     task='celery_tasks.tasks.my_task2',  # 指定需要周期性执行的任务
       ...: )                                                                                          
    Out[3]: <PeriodicTask: my_task1: every 10 seconds>
    
    # 创建带参数周期性任务
    In [27]: import json                                                                               
    
    In [28]: from datetime import datetime, timedelta                                                  
    
    In [29]: PeriodicTask.objects.create(  
        ...:     interval=schedule,  # 设置使用上面创建的 10 秒间隔 interval 对象
        ...:     name='my_task1',    # 设置周期性任务的名称
        ...:     task='celery_tasks.tasks.my_task1',  # 设置指定执行的task
        ...:     args=json.dumps([10, 20, 30]),  # 传递task需要的参数
        ...:     expires=datetime.now() + timedelta(seconds=30)
        ...: )                                                                                                                                                                    
    Out[29]: <PeriodicTask: my_task1: every 10 seconds>
    

    查看beat推送周期性任务的日志:

    image-20200515105426638

    查看单个worker的执行日志:

    image-20200515112919093

    可以看到,因为worker不能并行执行任务,所以任务从beat发出来之后,在单个worker是串行执行的,所以如果想要并发执行worker,可以开启多线程的方式,或者开启多个进程。

    开启2个worker来查看执行日志:

    image-20200515113332254

    所以需要并行执行任务的时候,就需要设置多个worker来执行任务。

    创建基于 crontab 的周期性任务

    无限一直循环执行的BUG

    crontab周期性任务在使用的时候会出现beat服务一直不停发任务的情况,导致无法使用。目前尝试多种方式,仍未有解决的办法。

    初始化 crontab 的调度对象

    In [29]: import pytz                                                                                 
    
    In [30]: from django_celery_beat.models import CrontabSchedule, PeriodicTask                                                                                                  
    In [31]: schedule, _ = CrontabSchedule.objects.get_or_create( 
        ...:     minute='*', 
        ...:     hour='*', 
        ...:     day_of_week='*', 
        ...:     day_of_month='*', 
        ...:     month_of_year='*', 
        ...:     timezone=pytz.timezone('Asia/Shanghai') 
        ...: )     
    
    In [32]: CrontabSchedule.objects.all()                                                               Out[32]: <QuerySet [<CrontabSchedule: * * * * * (m/h/d/dM/MY) Asia/Shanghai>, <CrontabSchedule: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>
    

    创建一个无参数的定时任务:

    In [36]: PeriodicTask.objects.create(  
        ...:     crontab=schedule, # 上面创建的 crontab 对象 * * * * *,表示每分钟执行一次
        ...:     name='my_task2_crontab', # 设置任务的name值
        ...:     task='celery_tasks.tasks.my_task2',  # 指定需要周期性执行的任务
        ...: )                                                                                                                                                                    
    Out[36]: <PeriodicTask: my_task2_crontab: * * * * * (m/h/d/dM/MY) Asia/Shanghai>
    

    beat服务不停发任务的日志,如下:

    image-20200515135653034

    周期性任务的查询、删除等操作

    其实周期性任务也是存储在数据库的数据,基本上是基于ORM的操作的。

    周期性任务的查询

    # 导入周期性任务
    In [1]: from django_celery_beat.models import PeriodicTask
    
    # 查询目前所有的周期性任务
    In [3]: PeriodicTask.objects.all()                                                                     Out[3]: <ExtendedQuerySet [<PeriodicTask: Importing contacts: every 10 seconds>, <PeriodicTask: my_task: every 10 seconds>, <PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>
    
    # 遍历周期性任务    
    In [4]: for task in PeriodicTask.objects.all(): 
       ...:     print(task.id) 
       ...:                                                                                                                                                                       
    1
    2
    3
    
    # 通过id获取其中一个周期性任务
    In [5]: task1 = PeriodicTask.objects.get(id=1)                                                                                                                                
    In [6]: task1                                                                                         Out[6]: <PeriodicTask: Importing contacts: every 10 seconds>
    
    In [7]: PeriodicTask.objects.get(name="my_task")                                                                                                                              
    Out[7]: <PeriodicTask: my_task: every 10 seconds>
    
    # 通过name获取其中的周期性任务
    In [8]: task2 = PeriodicTask.objects.get(name="my_task")                                                                                                                      
    In [9]: task2                                                                                         Out[9]: <PeriodicTask: my_task: every 10 seconds>
    

    周期性任务的删除

    获取到了周期性任务之后,好奇的我尝试直接删除,发现直接死循环:

    #  删除周期性的任务,千万不要这样做,会死循环
    In [10]: task1.delete()                                                                               Out[10]: (1, {'django_celery_beat.PeriodicTask': 1})
    
    In [11]: task2.delete()                                                                               Out[11]: (1, {'django_celery_beat.PeriodicTask': 1})
    

    如果要删除周期性任务,必须首先暂停任务,然后再删除,如下:

    # 设置name为 my_taks1 的任务暂停执行
    In [6]: PeriodicTask.objects.get(name="my_task1").enabled = False                                  
    In [7]: PeriodicTask.objects.get(name="my_task1").save()                                           
    
    # 删除该任务
    In [8]: PeriodicTask.objects.get(name="my_task1").delete()                                         
    Out[8]: (1, {'django_celery_beat.PeriodicTask': 1})
    

    周期性任务暂停之后,重新启动

    当然会有暂停任务之后,重新开启任务的需求,如下:

    # 设置任务的 enabled 为 True 即可:
    In [21]: PeriodicTask.objects.get(name="my_task1").enabled = True                                     
    In [22]: PeriodicTask.objects.get(name="my_task1").save()
    

    更多精彩原创Devops文章,快来关注我的Devops社群吧:

    image

    相关文章

      网友评论

          本文标题:22. Celery 4.x 动态添加定时任务

          本文链接:https://www.haomeiwen.com/subject/ttnhohtx.html