简述
1 triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了他们自己初始化配置外,触发器完全是无状态的
2 job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其它作业存储器可以将任务作业保存到各种数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。
3 executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器将会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务如比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据根据实际需求可以同时使用两种执行器。
4 schedulers(调度器):调度器是将其它部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器,相反调度器提供了处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如可以添加。修改、移除任务作业。
APScheduler提供了多种调度器
BlockingScheduler:适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用。
BackgroundScheduler: 适合于要求任何在程序后台运行的情况,当希望调度器在应用后台执行时使用。
AsyncIOScheduler:适合于使用asyncio框架的情况
GeventScheduler: 适合于使用gevent框架的情况
TornadoScheduler: 适合于使用Tornado框架的应用
TwistedScheduler: 适合使用Twisted框架的应用
QtScheduler: 适合使用QT的情况
内置三种调度调度系统:
Cron风格
间隔性执行
仅在某个时间执行一次
作业存储的backends支持:
Memory
SQLAlchemy (any RDBMS supported by SQLAlchemy works)
MongoDB
Redis
RethinkDB
ZooKeeper
executor
aps把任务最终的执行机制也抽象了出来,可以根据IO模型选配,不需要讲太多,最常用的是threadpool和processpoll两种(来自concurrent.futures的线程/进程池)。
不同类型的executor实现自己的_do_submit_job,完成一次实际的任务实例执行。以线程/进程池实现为例
内置executors供选:
ProcessPoolExecutor: 多进程,可指定进程数,当工作负载为CPU密集型操作时可以考虑使用它来利用多核CPU
ThreadPoolExecutor: 多线程,可指定线程数,默认,可以满足大多数用途
trigger
trigger是抽象出了“一个job是何时被触发”这个策略,每种trigger实现自己的get_next_fire_time函数
aps提供的trigger包括:
date:一次性指定日期
interval:在某个时间范围内间隔多长时间执行一次
cron:和unix crontab格式兼容,最为强大
添加任务
调用调度器的add_job() //可remvoe任务
使用调度器的scheduled_job()装饰器: 很简洁,推荐这种//只能去删代码
示例
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
默认同一时刻只能有一个实例运行,通过max_instances=3修改为3个。
Scheduler 事件
监听Scheduler发出的事件并作出处理,如任务执行完、任务出错等
def my_listener(event):
if event.exception:
print('The job crashed :(') # or logger.fatal('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
与celery对比
网友评论