最近使用Flask开发时遇到一个需求,需要在后台系统里进行定时任务的管理,这里选择了Flask-APScheduler;
首先展示一下目录结构:
image.png
app/__init__.py
是初始化Flask的位置,具体内容如截图:
在这里借鉴了部分网友的东西,比如这位:https://ask.hellobi.com/blog/seng/7432
但是,针对于我的项目,这部分代码有问题,不知道是老哥的代码没公布完还是什么
1.首先下载 Flask-APScheduler
pip install Flask-APScheduler
版本: 3.2.0
2. 在配置中,添加一个APS的API的开关
image.png配置我就不解释了,可以百度下
3.在初始化Flask的时候,初始化APScheduler,就是上面 __init__.py
中的四句代码
from flask_apscheduler import APScheduler;
scheduler = APScheduler();
scheduler.init_app(app)
scheduler.start()
这里坑大得很,之前没初始化,始终无法运行定时任务
4. 使用Flask-APScheduler的API接口,来动态创建任务
image.png直接调用一下test()方法就搞定了
from app.factory.Factory import Redis;
import time;
from flask_apscheduler import APScheduler;
from flask import current_app;
def add_job():
Redis().lpush('test:rds', str(time.time()));
def test():
job = {
'id':'rds-to-mysql-1', # 任务的唯一ID,不要冲突
'func':'add_job', # 执行任务的function名称
'args': '', # 如果function需要参数,就在这里添加
} ;
# current_app 是获取当前的app主体
#
# 网上没找到这句代码,这是我穷途末路的时候,不小心按到了Ctrl + APScheduler(),
# 看到他的源码里的init_app()方法里面,将sched实例注入到了app里面,
# 才突然发现新大陆,解决了这个问题
# 这些add_job的参数名称,可以借鉴:http://www.dannysite.com/blog/73/
result = current_app.apscheduler.add_job(func=__name__+':'+job['func'], id=job['id'], trigger='interval', seconds=1);
print(result);
return '123';
总结
坚持惯例:辣鸡某度,辣鸡CV战士,同样的文章,占满了某度的第一页搜索结果。
这里肯定有同学发现了,很多文章都是在配置里面加了JOBS的配置参数,包括Flask-APScheduler
的example里也是这么操作的,他们跑出来确实没毛病,因为他们的参数 func
就放在了当前py文件里 ,所以他们能解决,但是换个py文件,始终报错,报module not found
,所以我直接绕过了这个解决方案,换了现在这个。
贴个官方的GitHub地址:https://github.com/viniciuschiele/flask-apscheduler/blob/master/examples/jobs.py
网友评论