美文网首页
django 任务管理-apscheduler

django 任务管理-apscheduler

作者: 菩提老鹰 | 来源:发表于2022-08-03 17:28 被阅读0次
    python-apscheduler.png

    主要介绍通过 apscheduler的三种

    Cron

    在特定时间定期运行,相比较Linux crontab 多了 second/year/week(第多少周)/start_date/end_date

    主要的参数:

    • year (int|str) 4位数
    • month (int|str 1-12)
    • day (int|str 1-31)
    • week (int|str 1-53)
    • day_of_week (int|str 0-6或者mon/tue/wed/thu/fri/sat/sun)
    • hour (int|str 0-23)
    • minute (int|str 0-59)
    • second (int|str 0-59)
    • start_date (date|datetime|str)
    • end_date (date|datetime|str)

    特殊说明
    1、linux crontab 中的 week 对应到 apscheduler中是 day_of_week (取值0到6后者mon,tue,wed,thu,fri,sat,sun)
    2、配置job的时候,并不是所有时间字段都是必须
    3、但是需要知道的是,隐含 大于最小有效值的字段默认为*,而较小的字段默认为其最小值,除了weekday_of_week 默认为*
    举例说明:

    # month=6, hour=1 最小有效值字段为 hour; 其等价于 
    year='*', month=6, day=*, week='*', day_of_week='*', hour=1, minute=0, second=0
    # 意思是在每年 6 月每天 0 点 0 分 0 秒运行;
    

    4、字段支持表达式,其中特殊的是 day, 支持 lastlast xxth y

    • last : 配置当月的最后一天
    • last x :匹配 x 在当月的最后一次,比如 last mon 指 当月的最后一个周一
    • xth y : 匹配y在当月的第 x次,比如 4th mon指当月的第四个周一

    Interval

    以固定的时间间隔运行Job

    主要的参数:

    • weeks (int)
    • days (int)
    • hours (int)
    • minutes (int)
    • seconds (int)
    • start_date (date|datetime|str)
    • end_date (date|datetime|str)

    Date

    某个特定时间仅运行一次的任务,类似于Linux的 at

    主要的参数:

    • run_date (date|datetime|str)

    Demo演示

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    """
    @Author:    Colin
    @Github:    https://github.com/opscolin
    @DateTime:  2022/8/3 11:14 AM
    @File:      demo_aps.py
    @Software:  PyCharm
    """
    
    import datetime
    import time
    import logging
    
    from pytz import utc
    from apscheduler.schedulers.background import BackgroundScheduler
    # from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
    # 日志 logger 配置
    logging.basicConfig(filemode='a', filename='python-apscheduler.log')
    
    # apscheduler store/executor 配置
    job_stores = {
        # 'mongo': MongoDBJobStore(),
        # 'mongo': {'type': 'mongodb'},
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
        'db-ops': SQLAlchemyJobStore(
            url='mysql+pymysql://xxxx:yyyy@192.168.a.b:3306/dbname?charset=utf8mb4')
    }
    
    executors = {
        'default': ThreadPoolExecutor(20),
        # 'default': {'type': 'threadpool', 'max_workers': 20},
        'process_pool': ProcessPoolExecutor(5)
    }
    
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    
    # 方式一
    scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
    
    
    # 方式二
    # scheduler = BackgroundScheduler()
    # scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone=utc)
    
    
    # 定义任务Job
    # 无参数Job
    def job1():
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(f"No params of job1  at {now}")
    
    
    # 带参数 Job
    def job2(name):
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(f"Job with param {name} at {now}")
    
    
    # Demo - Cron
    scheduler.add_job(job2, trigger='cron', hour=11, minute=45, second=10, args=['Betty'])
    # 每 30 秒执行一次
    scheduler.add_job(job1, trigger='cron', second='*/20', jobstore='db-kfzops')
    # 2022-12-31 之前, 第一季度和第三季度的每个月第三个周一的凌晨1点半执行
    scheduler.add_job(job2, trigger='cron', args=['Betty'],
                      end_date='2022-12-31',
                      month='1-3,7-9',
                      day='3rd mon',
                      hour=1,
                      minute=30)
    
    # Demo - Interval -> 每 30 秒执行一次
    scheduler.add_job(job1, trigger='interval', seconds=30)
    
    # Demo - Date
    scheduler.add_job(job2, trigger='date', run_date='2022-08-03 14:20:00', args=['Betty'])
    
    
    def main():
        scheduler._logger = logging
        scheduler.start()
        while True:
            time.sleep(1)
    
    
    if __name__ == '__main__':
        main()
    

    脚本地址:https://gitee.com/colin5063/pylearn2022/blob/master/examples/scripts/demo_apscheduler.py

    总结:

    • 1、apscheduler 可以独立运行,也可以结合程序运行,后续介绍和Django结合的案例
    • 2、任务配置灵活,支持按照时间间隔、指定日期、或者更高级的crontab
    • 3、crontab 类型支持秒级别,同时多了 start_date/end_date/week 类别 以及 对 day 的特殊用法
    • 4、创建任务的时候支持 装饰器 @register_job()scheduler.add_job()

    遇到的问题

    目前把任务的结果通过 jobstore 写入 sqlite 或者 MySQL中,job_state字段都是乱码,即使配置了字符集,也是乱码,目录该问题暂未找到原因。

    如果大家有答案,欢迎留言分享


    如果觉得文章对你有所帮忙,欢迎点赞收藏,或者可以关注个人公众号 全栈运维 哦,一起学习,一起健身~

    相关文章

      网友评论

          本文标题:django 任务管理-apscheduler

          本文链接:https://www.haomeiwen.com/subject/mucewrtx.html