APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。官方文档:https://apscheduler.readthedocs.io/en/latest/userguide.html#basic-concepts
安装
pip install apscheduler
介绍
触发器tiggers
触发器中包含调度逻辑(定时任务在什么时间运行
),每个作业都由自己的触发器来决定下次运行时间。除了他们自己初始配置意外,触发器完全是无状态的.
作业存储器job stores
存储被调度的作业(定时任务在什么地方保存
),默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中。当作业被保存到一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化。作业存储器不能共享调度器。
如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)
执行器executors
处理作业的运行(线程/进程池运行
),他们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。
调度器schedulers
配置作业存储器和执行器可以在调度器中完成(定时任务运行的方式
),例如添加、修改和移除作业。根据不同的应用场景可以选用不同的调度器,可选的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7种。
BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。
BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)。
AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。
实践
定义定时任务类,用来添加,删除定时任务,示例代码如下
from apscheduler.schedulers.background import BackgroundScheduler
from .views_extension import run_case
import json
import logging
# 定义日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='scheduler_log.txt',
filemode='a'
)
# 定义调度器(后端运行)
_scheduler = BackgroundScheduler()
def _task_id(task):
"""
生成task的id
"""
return "scheduler_%s" % task.id
def add_task(task, request):
"""
添加定时任务
"""
trigger_kwargs = json.loads(task.expr)
# trigger触发器,
# trigger_kwargs是apsheduler的触发参数
# id:定时任务id,用于关闭
# run_case:具体定时任务
# args:定时任务/函数的传参
_scheduler.add_job(run_case, args=[task.case.id, request], trigger="cron", id=_task_id(task), **trigger_kwargs)
def remove_task(task):
# 根据任务id获取定时任务
job = _scheduler.get_job(_task_id(task))
# 若任务存在,则移除任务
if job:
_scheduler.remove_job(_task_id(task))
# 启动调度器
_scheduler.start()
#views.py
class StartStopTaskView(CustomViews, views.APIView):
def post(self, request, task_id, target_status):
task = models.CrontabTask.objects.get(pk=task_id)
#根据定时任务的状态,启动/停止定时任务
if target_status == 1:
if task.status == 1:
return Response(status=status.HTTP_400_BAD_REQUEST)
#添加定时任务
zlscheduler.add_task(task, request)
task.status = 1
elif target_status == 2:
if task.status == 2:
return Response(status=status.HTTP_400_BAD_REQUEST)
#删除定时任务
zlscheduler.remove_task(task)
task.status = 2
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
task.save()
return Response(serializers.CrontabTaskSerializer(task).data)
网友评论