美文网首页Python
django-celery-beat 定期任务配置(可动态添加任

django-celery-beat 定期任务配置(可动态添加任

作者: 边小仙 | 来源:发表于2019-02-26 10:08 被阅读0次

    1.版本信息

    celery==4.2.1
    Django==1.11.11
    django-celery-beat==1.4.0
    django-celery-results==1.0.1
    

    官方文档:celery定期任务django_celery_beat 1.4.0;

    2.安装与配置

    1.使用pip安装包:
    $ pip install django-celery-beat

    2.将django_celery_beat模块添加到INSTALLED_APPSDjango项目中settings.py:

    #jdango时区配置
    TIME_ZONE = 'Asia/Shanghai'
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。
    USE_TZ = False
    
    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )
    
    # celery beat配置
    # CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
    

    3.应用Django数据库迁移,以便创建必要的表:
    $ python manage.py migrate
    迁移之后生产四个表:

    django_celery_beat.models.PeriodicTask 此模型定义要运行的单个周期性任务。
    django_celery_beat.models.IntervalSchedule 以特定间隔(例如,每5秒)运行的计划。
    django_celery_beat.models.CrontabSchedule
    与像在cron项领域的时间表 分钟小时日的一周 DAY_OF_MONTH month_of_year
    django_celery_beat.models.PeriodicTasks 此模型仅用作索引以跟踪计划何时更改

    4.proj/proj/celery.py 创建celery.py文件:

    # _*_ coding:utf-8 _*_
    """
    异步
    创建celery实例
    """
    
    from __future__ import absolute_import
    from __future__ import unicode_literals
    import os
    from celery import Celery
    from django.utils import timezone
    from kombu import Exchange
    from kombu import Queue
    import datetime
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    app = Celery("proj")
    # app.now=datetime.datetime.utcnow
    # print app.now(), 'celery---------------->>>>>>'
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    # 在django配置文件中进行配置,以大写CELERY开头
    app.config_from_object('django.conf:settings', namespace='CELERY')
    # 解决时区问题,定时任务启动就循环输出
    app.now = timezone.now
    # app.now = datetime.datetime.now
    
    # Load task modules from all registered Django app configs.
    # celery自动发现所有django-app下面的任务tasks.py
    app.autodiscover_tasks()
    
    # 通过设置x-max-priority参数来配置队列以支持优先级
    # app.conf.task_queues = [
    #     Queue('tasks', Exchange('tasks'), routing_key='tasks',
    #           queue_arguments={'x-max-priority': 10}),
    # ]
    # 设置所有队列的默认值
    # app.conf.task_queue_max_priority = 10
    
    # 设置了三个Queue绑定到一个direct类型的exchange上,然后consumer监听所有的队列,消息来了后就轮询调用consumer进行处理.
    task_exchange = Exchange('tasks', type='direct')
    # 异步任务优先级
    task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
                   Queue('midpri', task_exchange, routing_key='midpri'),
                   Queue('lopri', task_exchange, routing_key='lopri')]
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    备注:
    1.django-celery-beat 依赖celery异步,所以先要配置好异步任务,才能使用定时任务;
    2.先版本一直有一个时区问题bug(只能使用utc时区,自定义时区未生效),运行定期任务可能会出现无限循环任务,添加 app.now = timezone.now 这个暂时解决无限循环问题,但是cron周期任务还是有问题

    3.运行

    首先启动异步任务,然后dug模式运行定期服务:
    $ celery -A proj beat -l info
    下面是配置上日志命令:
    celery -A proj beat -l info -f /home/bl/work/proj/log/celery_beat.log

    4.添加任务

    使用django-admin后台直接在表里添加定时任务或者在代码中在表PeriodicTask 中直接填任务即可,djang-celery-beat 会自动检测执行任务。
    例如:

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    
    # 添加定时间隔周期
    #IntervalSchedule.DAYS
    #IntervalSchedule.HOURS
    #IntervalSchedule.MINUTES
    #IntervalSchedule.SECONDS
    #IntervalSchedule.MICROSECONDS
    >>> schedule, created = IntervalSchedule.objects.get_or_create(
    ...     every=10,
    ...     period=IntervalSchedule.SECONDS,
    ... )
    #添加定时任务
    >>> PeriodicTask.objects.create(
    ...     interval=schedule,                  # we created this above.
    ...     name='Importing contacts',          # simply describes this periodic task.
    ...     task='proj.tasks.add',  # name of task.
    ... )
    #下面是添加带参数的定时任务
    >>> import json
    >>> from datetime import datetime, timedelta
    
    >>> PeriodicTask.objects.create(
    ...     interval=schedule,                  # we created this above.
    ...     name='Importing contacts',          # simply describes this periodic task.
    ...     task='proj.tasks.add',  # name of task.
    ...     args=json.dumps(['arg1', 'arg2']),
    ...     kwargs=json.dumps({
    ...        'be_careful': True,
    ...     }),
    ...     expires=datetime.utcnow() + timedelta(seconds=30)
    ... )
    

    相关文章

      网友评论

        本文标题:django-celery-beat 定期任务配置(可动态添加任

        本文链接:https://www.haomeiwen.com/subject/kfywyqtx.html