这篇文章需要在上一篇文章celery with rabbitmq,的配置之后。
- Django==2.2.1
- celery==4.3.0
官网:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers
start
mkdir django_celery_new & cd django_celery_new
virtualenv venv
source venv/bin/active
pip install django celery django_celery_beat
django-admin startproject celerypro & cd celerypro
django-admin startapp foo
edit
pwd
>>
/home/alonebo/PycharmProjects/django_celery_new/celerypro/celerypro
touch celery.py
<<
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celerypro.settings')
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
from django.conf import settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
vim __init__.py
<<
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
vim settings.py
<<
...
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_beat', # 定时任务django版
'foo',
]
...
django_celery_beat是celery4之后的,旧版本使用dcelery。
cd ../foo
vim tasks.py
<<
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from celerypro.celery import app
@app.task
def foo():
print('hello')
@app.task
def per_foo():
print('per hello')
动态的让已有的task变为定时/循环的任务
from django.shortcuts import render
from django.http import HttpResponse
from .tasks import foo
from django_celery_beat.models import PeriodicTask, CrontabSchedule, IntervalSchedule
import json
from django.utils import timezone
import datetime
# Create your views here.
def v_foo(req):
foo.delay()
return HttpResponse('Hello!')
def v_new_per_foo_CrontabSchedule(req):
# PeriodicTask.objects.create(name='v_new_per_foo', task='foo.tasks.per_foo')
# task任务, created是否定时创建
task, created = PeriodicTask.objects.get_or_create(name='task_v_new_per_foo', task='foo.tasks.per_foo')
crontab_time = {
'month_of_year': 4, # 月份
'day_of_month': 10, # 日期
'hour': 22, # 小时
'minute': 5, # 分钟
}
# 获取 crontab
crontab = CrontabSchedule.objects.filter(**crontab_time).first()
if crontab is None:
# 如果没有就创建,有的话就继续复用之前的crontab
crontab = CrontabSchedule.objects.create(**crontab_time)
task.crontab = crontab # 设置crontab
task.enabled = True # 开启task
task.kwargs = json.dumps({'username': 'zhanhsan'}) # 传入task参数
expiration = timezone.now() + datetime.timedelta(hours=1)
task.expires = expiration # 设置任务过期时间为现在时间的一小时以后
task.save()
return HttpResponse('Hello!')
v_new_per_foo = v_new_per_foo_CrontabSchedule
def v_new_per_foo_IntervalSchedule(req):
# PeriodicTask.objects.create(name='v_new_per_foo', task='foo.tasks.per_foo')
# task任务, created是否定时创建
task, created = PeriodicTask.objects.get_or_create(name='task_v_new_per_foo_IntervalSchedule', task='foo.tasks.per_foo')
interval = IntervalSchedule.objects.create(every=5, period='seconds')
task.enabled = True # 开启task
task.interval = interval
expiration = timezone.now() + datetime.timedelta(hours=1)
task.expires = expiration # 设置任务过期时间为现在时间的一小时以后
task.save()
return HttpResponse('Hello!')
其实默认不配合setting的其它参数,就能正确运行,高版本使用django_celery_beat
,低版本使用dcelery
。
运行使用:
celery -A proj worker -B -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
网友评论