主要介绍通过 apscheduler的三种
Cron
在特定时间定期运行,相比较Linux crontab 多了 second/year/week(第多少周)/start_date/end_date
主要的参数:
- year (int|str) 4位数
- month (int|str 1-12)
- day (int|str 1-31)
- week (int|str 1-53)
- day_of_week (int|str 0-6或者mon/tue/wed/thu/fri/sat/sun)
- hour (int|str 0-23)
- minute (int|str 0-59)
- second (int|str 0-59)
- start_date (date|datetime|str)
- end_date (date|datetime|str)
特殊说明
1、linux crontab 中的 week 对应到 apscheduler中是 day_of_week (取值0到6后者mon,tue,wed,thu,fri,sat,sun)
2、配置job的时候,并不是所有时间字段都是必须
3、但是需要知道的是,隐含 大于最小有效值
的字段默认为*
,而较小的字段
默认为其最小值
,除了week
和 day_of_week
默认为*
举例说明:
# month=6, hour=1 最小有效值字段为 hour; 其等价于
year='*', month=6, day=*, week='*', day_of_week='*', hour=1, minute=0, second=0
# 意思是在每年 6 月每天 0 点 0 分 0 秒运行;
4、字段支持表达式,其中特殊的是 day
, 支持 last
、last x
和 xth y
- last : 配置当月的最后一天
- last x :匹配
x
在当月的最后一次,比如last mon
指 当月的最后一个周一 - xth y : 匹配
y
在当月的第x
次,比如4th mon
指当月的第四个周一
Interval
以固定的时间间隔运行Job
主要的参数:
- weeks (int)
- days (int)
- hours (int)
- minutes (int)
- seconds (int)
- start_date (date|datetime|str)
- end_date (date|datetime|str)
Date
某个特定时间仅运行一次的任务,类似于Linux的
at
主要的参数:
- run_date (date|datetime|str)
Demo演示
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Author: Colin
@Github: https://github.com/opscolin
@DateTime: 2022/8/3 11:14 AM
@File: demo_aps.py
@Software: PyCharm
"""
import datetime
import time
import logging
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
# from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 日志 logger 配置
logging.basicConfig(filemode='a', filename='python-apscheduler.log')
# apscheduler store/executor 配置
job_stores = {
# 'mongo': MongoDBJobStore(),
# 'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
'db-ops': SQLAlchemyJobStore(
url='mysql+pymysql://xxxx:yyyy@192.168.a.b:3306/dbname?charset=utf8mb4')
}
executors = {
'default': ThreadPoolExecutor(20),
# 'default': {'type': 'threadpool', 'max_workers': 20},
'process_pool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# 方式一
scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
# 方式二
# scheduler = BackgroundScheduler()
# scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone=utc)
# 定义任务Job
# 无参数Job
def job1():
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"No params of job1 at {now}")
# 带参数 Job
def job2(name):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Job with param {name} at {now}")
# Demo - Cron
scheduler.add_job(job2, trigger='cron', hour=11, minute=45, second=10, args=['Betty'])
# 每 30 秒执行一次
scheduler.add_job(job1, trigger='cron', second='*/20', jobstore='db-kfzops')
# 2022-12-31 之前, 第一季度和第三季度的每个月第三个周一的凌晨1点半执行
scheduler.add_job(job2, trigger='cron', args=['Betty'],
end_date='2022-12-31',
month='1-3,7-9',
day='3rd mon',
hour=1,
minute=30)
# Demo - Interval -> 每 30 秒执行一次
scheduler.add_job(job1, trigger='interval', seconds=30)
# Demo - Date
scheduler.add_job(job2, trigger='date', run_date='2022-08-03 14:20:00', args=['Betty'])
def main():
scheduler._logger = logging
scheduler.start()
while True:
time.sleep(1)
if __name__ == '__main__':
main()
脚本地址:https://gitee.com/colin5063/pylearn2022/blob/master/examples/scripts/demo_apscheduler.py
总结:
- 1、apscheduler 可以独立运行,也可以结合程序运行,后续介绍和Django结合的案例
- 2、任务配置灵活,支持按照时间间隔、指定日期、或者更高级的crontab
- 3、crontab 类型支持秒级别,同时多了 start_date/end_date/week 类别 以及 对 day 的特殊用法
- 4、创建任务的时候支持 装饰器
@register_job()
和scheduler.add_job()
遇到的问题
目前把任务的结果通过 jobstore 写入 sqlite
或者 MySQL
中,job_state
字段都是乱码,即使配置了字符集,也是乱码,目录该问题暂未找到原因。
如果大家有答案,欢迎留言分享
如果觉得文章对你有所帮忙,欢迎点赞收藏,或者可以关注个人公众号 全栈运维
哦,一起学习,一起健身~
网友评论