美文网首页学习提升Python
flask_Apscheduler使用详情

flask_Apscheduler使用详情

作者: 周周周__ | 来源:发表于2019-12-11 11:40 被阅读0次
在做WEb开发的过程中,定时任务是一个比较重要的功能之一。

常见定时方式有:Python3.x:定时任务实现方式
1、线程等待time.sleep()
2、crontab
3、Apschedulder
...
在py中,Apschedulder是一个比较常见的定时任务管理的第三方库。本文主要讲在flask中得具体使用方法。Apschedulder具体使用(非测试用例,项目开发中遇到的)

首先是flask得基本架构

--app
    --static      #静态文件
    --templates   #界面
    --task
        --scheduler.py #任务路由
    --__inin__.py  #初始化
    --config.py        # 配置
    --extension.py  # 插件导包
    --model.py  # 数据表
    --util.py  #工具文件
--wsig.py    #启动文件

--congif.py 主要配置文件,配置得flask_apscheduler

class Config(object):
    # 关系映射得基本配置
    MONGODB_SETTINGS = {
        'db': 'test',
        'host': '127.0.0.1',
        'port': 27017
    }
    # schulder 基本配置
    # 配置任务的存储地址为mongo
    client = MongoClient(MONGODB_SETTINGS['host'], MONGODB_SETTINGS['port'])
    SCHEDULER_JOBSTORES = {
        'default': MongoDBJobStore(collection='job', database=MONGODB_SETTINGS['db'], client=client)
        # 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

    }
    # 配置进程池并发
    SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 20}}
    # coalesce积攒得任务跑几次,在时间允许得范围内 True:默认最后一次,False:在时间允许范围内全部提交
    # max_instances 同时允许并发的最大并发量
    # misfire_grace_time 如果重启任务在这个时间范围内,就能继续重启
    SCHEDULER_JOB_DEFAULTS = {'coalesce': True, 'max_instances': 3, 'misfire_grace_time': 6000}
    SCHEDULER_API_ENABLED = True
    # 配置redis数据库
    REDIS_URL = "redis://:@127.0.0.1:6379/0"
    # session基本配置
    SECRET_KEY = os.getenv('SECRET_KEY', 'dev key')
config = Config

_init_.py 初始化文件

from app.extensions import scheduler  #从导包文件导入
def create_app(config_name=None):
    app = Flask(__name__)  # 注册生成app
    app.config.from_object(config)  # 导入默认配置
    scheduler.start()
    return app
# 初始化配置文件
def register_extensions(app):
    scheduler.init_app(app)

extension.py

from flask_apscheduler import APScheduler
scheduler = APScheduler()

util.py 主要是scheduler功能,动态增删改查

def task2(a, b):
    print('mession2')
    print(datetime.datetime.now())
def pausetask():  # 暂停
    # data = request.form['id']
    # scheduler.pause_job(str(data))
    return "Success!"
def resumetask():  # 恢复
    # data = request.form['id']
    # scheduler.resume_job(str(data))
    return "Success!"
def get_task():  # 获取
    jobs = scheduler.get_jobs()
    return jobs
def remove_task(data):  # 移除
    scheduler.remove_job(str(data))
    return
def addtask(items):
    #执行任务的函数
    scheduler.add_job(func=task2,  # 执行任务的函数
                      id=rule_no,  # 任务的唯一id
                      args=(items, 2),  # 传递得参数
                      trigger='interval',  # 循环执行
                      seconds=int(interval),  # 间隔时间
                      start_date=time1,  # 开始时间 这个时间作为可控参数主要是任务首次执行时间
                      end_date=end_date,  # 结束时间
                      replace_existing=True,
                      # timezone=utc,  # 这里的时区如果用得话会和上边有冲突
                      )
    return 'sucess'

scheduler.py

图片.png

相关文章

网友评论

    本文标题:flask_Apscheduler使用详情

    本文链接:https://www.haomeiwen.com/subject/iicogctx.html