celery with django

作者: 霡霂976447044 | 来源:发表于2019-05-05 17:21 被阅读0次

这篇文章需要在上一篇文章celery with rabbitmq,的配置之后。

  • Django==2.2.1
  • celery==4.3.0

官网:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

start

mkdir django_celery_new & cd django_celery_new
virtualenv venv
source venv/bin/active
pip install django celery django_celery_beat
django-admin startproject celerypro & cd celerypro
django-admin startapp foo

edit

pwd

>>

/home/alonebo/PycharmProjects/django_celery_new/celerypro/celerypro
touch celery.py

<<

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celerypro.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

from django.conf import settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
vim __init__.py

<<

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
vim settings.py

<<

...
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    'django_celery_beat',  # 定时任务django版

    'foo',
]
...

django_celery_beat是celery4之后的,旧版本使用dcelery。

cd ../foo
vim tasks.py

<<

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

from celerypro.celery import app


@app.task
def foo():
    print('hello')

@app.task
def per_foo():
    print('per hello')

动态的让已有的task变为定时/循环的任务

from django.shortcuts import render
from django.http import HttpResponse
from .tasks import foo

from django_celery_beat.models import PeriodicTask, CrontabSchedule, IntervalSchedule
import json
from django.utils import timezone
import datetime

# Create your views here.
def v_foo(req):
    foo.delay()
    return HttpResponse('Hello!')



def v_new_per_foo_CrontabSchedule(req):
    # PeriodicTask.objects.create(name='v_new_per_foo', task='foo.tasks.per_foo')
    # task任务, created是否定时创建
    task, created = PeriodicTask.objects.get_or_create(name='task_v_new_per_foo', task='foo.tasks.per_foo')

    crontab_time = {
        'month_of_year': 4,  # 月份
        'day_of_month': 10,  # 日期
        'hour': 22,  # 小时
        'minute': 5,  # 分钟
    }
    # 获取 crontab
    crontab = CrontabSchedule.objects.filter(**crontab_time).first()
    if crontab is None:
        # 如果没有就创建,有的话就继续复用之前的crontab
        crontab = CrontabSchedule.objects.create(**crontab_time)
    task.crontab = crontab  # 设置crontab
    task.enabled = True  # 开启task
    task.kwargs = json.dumps({'username': 'zhanhsan'})  # 传入task参数
    expiration = timezone.now() + datetime.timedelta(hours=1)
    task.expires = expiration  # 设置任务过期时间为现在时间的一小时以后
    task.save()
    return HttpResponse('Hello!')


v_new_per_foo = v_new_per_foo_CrontabSchedule

def v_new_per_foo_IntervalSchedule(req):
    # PeriodicTask.objects.create(name='v_new_per_foo', task='foo.tasks.per_foo')
    # task任务, created是否定时创建
    task, created = PeriodicTask.objects.get_or_create(name='task_v_new_per_foo_IntervalSchedule', task='foo.tasks.per_foo')
    interval = IntervalSchedule.objects.create(every=5, period='seconds')
    task.enabled = True  # 开启task
    task.interval = interval
    expiration = timezone.now() + datetime.timedelta(hours=1)
    task.expires = expiration  # 设置任务过期时间为现在时间的一小时以后
    task.save()
    return HttpResponse('Hello!')

其实默认不配合setting的其它参数,就能正确运行,高版本使用django_celery_beat,低版本使用dcelery
运行使用:

celery -A proj worker -B -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

github: https://github.com/Baloneo/django-celery-example

相关文章

网友评论

    本文标题:celery with django

    本文链接:https://www.haomeiwen.com/subject/oijhoqtx.html