美文网首页
APScheduler动态添加任务

APScheduler动态添加任务

作者: frankie_cheung | 来源:发表于2019-02-26 19:44 被阅读0次

遇到一个需求,每个小时在数据库读取一下信息,这个信息是以字符串形式储存的定时任务,然后在这个时间点取触发这个任务。
写出来就是:

设备号 定时任务描述
sensor1 0 8 * * *
sensor2 35 9 * * *

以上为:sensor1设备在每天的8点执行,sensor2设备在每天的9点35分执行,我的需求就是每隔一个小时去读取数据库的这张表,解析这个crontab的下一次执行时间,具体解析方法见我另一篇文章:
https://www.jianshu.com/p/8fa82101605f
解析出来执行时间后,进行判定,假如这个执行时间和当前系统时间
datetime.datetime.now()的小时数据一致,则表明,下一次执行时间在本小时呢,那么我们进行循环,添加job,需要注意的是我们要使用非阻塞的BackgroundScheduler调度器,因为阻塞的那个不知道为什么job执行完了,还不退出。我们用这个调度器就方便了,把job添加进去后,我们加一个while 循环,判断任务的列表长度,使用方法为:get_jobs(),这个方法可以获取到所有job的任务,返回格式是list,我们判断len(list)的长度,假如长度为0,则表明任务全部被执行完了,直接break,跳出循环。
通俗表述为:

  • 读取数据库crontab信息,例如0 8 * * *,这个表示在每天的8点执行。
  • 判断执行时间是否在本小时内,假如在,就添加。
  • 动态添加job,在时间点触发执行。
    直接上代码:
from apscheduler.schedulers.background import BackgroundScheduler
import datetime
from croniter import croniter
import time
from camera_task.One_Hours_Task import *
from camera_task.camera_roteto_args import *
# LOG_FORMAT = "%(asctime)s: %(message)s ----   %(lineno)d"
# DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# log_path="/home/hduser/log.log"
# logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=log_path)

Now_Time=datetime.datetime.now()

sched = BackgroundScheduler()

Rotate_List=[]
No_Rotate_List=[]
for l in Compare_Time_Values():
    if l[3]==1:
        Rotate_List.append(l)
    else:
        No_Rotate_List.append(l)

for v in Rotate_List:


    iter = croniter("{}".format(v[2]), Now_Time)
    Execution_Time=iter.get_next(datetime.datetime)
    sched.add_job(Get_Image_Path, 'date', run_date=Str_exc_time,args=[l[1],l[-1],l[5],l[4] )

for k in No_Rotate_List:
    iter = croniter("{}".format(v[2]), Now_Time)
    Execution_Time=iter.get_next(datetime.datetime)
    sched.add_job(Get_Image_Path, 'date', run_date=Str_exc_time,args=[l[1],l[-1] )
    
sched.start()

while(True):
    print('main 1s')
    Job_list=sched.get_jobs()
    print(len(Job_list))
    time.sleep(1)
    if len(Job_list)==0:
        break
        

动态添加的关键因素为:
1.使用BackgroundScheduler调度器,这个在执行完可以自己关闭,具体代码为:判断任务列表长度,假如任务长度为0,则跳出while循环,直接就结束程序了。
2.使用for循环一直进行任务添加,添加完再进行sched.start()。
3.over

相关文章

  • APScheduler动态添加任务

    遇到一个需求,每个小时在数据库读取一下信息,这个信息是以字符串形式储存的定时任务,然后在这个时间点取触发这个任务。...

  • python定时任务最强框架APScheduler详细教程

    APScheduler定时任务 上次测试女神听了我的建议,已经做好了要给项目添加定时任务的决定了。但是之前提供的...

  • flask-apscheduler的app_context问题

    在flask应用中处理定时任务,有不少好用的模块,其中一个是apscheduler。apscheduler虽好用,...

  • Flask-APScheduler

    APScheduler是一款功能非常强大的定时任务框架。利用APScheduler框架我们可以很方便实现一个基于P...

  • 初识apscheduler任务调度

    前段时间写了一个程序,用于实时存储某文化众筹网站的订单数据。当时采用了while True死循环的写法,这么做虽然...

  • apscheduler定时任务

    组件介绍 APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是...

  • 定时任务apscheduler

    项目的public文件夹下创建job.py文件 具体完成定时任务的创建、运行、暂停、删除操作 这里在public/...

  • django 任务管理-apscheduler

    主要介绍通过 apscheduler的三种 Cron 在特定时间定期运行,相比较Linux crontab 多了 ...

  • APScheduler - Advanced Python Sc

    简介 APScheduler:强大的任务调度工具,可以完成定时任务,周期任务等,它是跨平台的,用于取代Linux下...

  • Elastic-Job动态添加任务

    背景 在使用Elastic-Job的过程中,有很多人遇到了这么一个问题,就是如何动态的去添加任务? 在官方的文档中...

网友评论

      本文标题:APScheduler动态添加任务

      本文链接:https://www.haomeiwen.com/subject/hkuzyqtx.html