本文主要将定时任务,django-celery的配置有很多教程
周期任务比较简单的,在自带的后台admin页面中中period添加注册的函数就好了
在app下面的命名为tasks.py的文件
@task()
def do_expire_task(physical_order_id):
from apps.physical.models import PhysicalOrder
PhysicalOrder.objects.filter(pk=physical_order_id, status=0).update(status=306)
def create_expired_physical_order(physical_order):
crontab_time = {
"month_of_year": physical_order.order_at.month,
"day_of_month": physical_order.order_at.day,
"hour": physical_order.order_at.hour+1,
"minute": physical_order.order_at.minute
}
## 执行的时间调度
crontab, _ = celery_models.CrontabSchedule.objects.get_or_create(**crontab_time)
name = "delay_one_hour_expired{}".format(physical_order.pk)
## 注意参数必须是双引号的["11"],
celery_models.PeriodicTask.objects.create(name=name, task="apps.physical.tasks.do_expire_task", crontab=crontab,
args='["{}"]'.format(physical_order.pk)
)
logger.info('--------------添加{}订单过期--------------'.format(name))
网友评论