在做WEb开发的过程中,定时任务是一个比较重要的功能之一。
常见定时方式有:Python3.x:定时任务实现方式
1、线程等待time.sleep()
2、crontab
3、Apschedulder
...
在py中,Apschedulder是一个比较常见的定时任务管理的第三方库。本文主要讲在flask中得具体使用方法。Apschedulder具体使用(非测试用例,项目开发中遇到的)
首先是flask得基本架构
--app
--static #静态文件
--templates #界面
--task
--scheduler.py #任务路由
--__inin__.py #初始化
--config.py # 配置
--extension.py # 插件导包
--model.py # 数据表
--util.py #工具文件
--wsig.py #启动文件
--congif.py 主要配置文件,配置得flask_apscheduler
class Config(object):
# 关系映射得基本配置
MONGODB_SETTINGS = {
'db': 'test',
'host': '127.0.0.1',
'port': 27017
}
# schulder 基本配置
# 配置任务的存储地址为mongo
client = MongoClient(MONGODB_SETTINGS['host'], MONGODB_SETTINGS['port'])
SCHEDULER_JOBSTORES = {
'default': MongoDBJobStore(collection='job', database=MONGODB_SETTINGS['db'], client=client)
# 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 配置进程池并发
SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 20}}
# coalesce积攒得任务跑几次,在时间允许得范围内 True:默认最后一次,False:在时间允许范围内全部提交
# max_instances 同时允许并发的最大并发量
# misfire_grace_time 如果重启任务在这个时间范围内,就能继续重启
SCHEDULER_JOB_DEFAULTS = {'coalesce': True, 'max_instances': 3, 'misfire_grace_time': 6000}
SCHEDULER_API_ENABLED = True
# 配置redis数据库
REDIS_URL = "redis://:@127.0.0.1:6379/0"
# session基本配置
SECRET_KEY = os.getenv('SECRET_KEY', 'dev key')
config = Config
_init_.py 初始化文件
from app.extensions import scheduler #从导包文件导入
def create_app(config_name=None):
app = Flask(__name__) # 注册生成app
app.config.from_object(config) # 导入默认配置
scheduler.start()
return app
# 初始化配置文件
def register_extensions(app):
scheduler.init_app(app)
extension.py
from flask_apscheduler import APScheduler
scheduler = APScheduler()
util.py 主要是scheduler功能,动态增删改查
def task2(a, b):
print('mession2')
print(datetime.datetime.now())
def pausetask(): # 暂停
# data = request.form['id']
# scheduler.pause_job(str(data))
return "Success!"
def resumetask(): # 恢复
# data = request.form['id']
# scheduler.resume_job(str(data))
return "Success!"
def get_task(): # 获取
jobs = scheduler.get_jobs()
return jobs
def remove_task(data): # 移除
scheduler.remove_job(str(data))
return
def addtask(items):
#执行任务的函数
scheduler.add_job(func=task2, # 执行任务的函数
id=rule_no, # 任务的唯一id
args=(items, 2), # 传递得参数
trigger='interval', # 循环执行
seconds=int(interval), # 间隔时间
start_date=time1, # 开始时间 这个时间作为可控参数主要是任务首次执行时间
end_date=end_date, # 结束时间
replace_existing=True,
# timezone=utc, # 这里的时区如果用得话会和上边有冲突
)
return 'sucess'
scheduler.py
![](https://img.haomeiwen.com/i13090532/7e5bbf359f5bbe31.png)
网友评论