美文网首页Python
APScheduler管理及监控平台

APScheduler管理及监控平台

作者: 翘起尾巴的熊 | 来源:发表于2021-03-09 11:36 被阅读0次

    背景

    APScheduler是一个非常好用的调度平台,不过目前所有Scheduler的JOB信息都无法通过可视化的方式展示,只能通过后台日志来查看调度信息,对于管理上非常不便。

    但是APScheduler非常的强大,已经预留的event功能可以帮助来实现此功能,对于APScheduler原理还不太理解的话,可以参考之前的一篇文章Python定时库APScheduler原理及用法

    在使用Flask进行管理后,通过Flask-APScheduler插件来实现对APScheduler的管理以及动态增删JOB的接口实现,以此完成对APScheduler的全方位管理。

    目的

    本文的目的主要有两部分功能块,第一部分是利用APScheduler的event机制来实现以下两个功能并进行可视化查看

    • 将APScheduler中所有添加的JOB进行状态跟踪
    • APScheduler中每个JOB的生命周期进行跟踪

    第二部分是在Flask框架上构建的管理平台上集成Flask-APScheduler插件,完成对APScheduler的管理以及动态增删JOB的接口实现。

    实现

    集成Flask-APScheduler插件完成APScheduler的动态管理

    将APScheduler集成到Flask中

    config_name = os.getenv('FLASK_CONFIG') or 'default'
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    config[config_name].init_app(app)
    # 初始化Sqlarchemy
    db.app = app
    db.init_app(app)
    # 初始化 flask_apscheduler,将scheduler嵌入到flask管理,本地在flask_apscheduler插件中增加add_listener监听所有的job生命周期
    flask_apscheduler = CustomAPScheduler(db.session, app=app)
    # 启动apscheduler
    flask_apscheduler.start()
    

    配置Flask-APScheduler开启对外接口

    class Config:
        # apscheduler默认的jobstore
        SCHEDULER_JOBSTORES = {}
        # flask_apscheduler是否对外提供接口
        SCHEDULER_API_ENABLED = True
    

    Flask-APScheduler提供的api如下

    def _load_api(self):
        """
        Add the routes for the scheduler API.
        """
        self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
        self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
        self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
        self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
        self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
        self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
        self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
        self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
        self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
    

    启动后,通过提供的接口进行动态管理

    直接动态调用接口添加, 具体的参数需要到apscheduler的源码进行查看

    添加JOB举例说明(add_job)

    请求添加接口:http://127.0.0.1:5000/scheduler/jobs
    请求方法:POST
    请求header:
    {
        "Content-Type": "application/json"
    }
    请求body:
    {
        "id": "test_add_job",
        "name":"管理平台添加job测试",
        "func": "app:jobs.test.test_job", # 这里就是模块:函数,本地定义的方法保证可以import
        "trigger": "date" # 触发器为指定时间,这里时间没有指定,就是立马执行
    }
    返回结果:
    {
        "id": "test_add_job",
        "name": "管理平台添加job测试",
        "func": "app:jobs.test.test_job",
        "args": [],
        "kwargs": {},
        "trigger": "date",
        "run_date": "2021-03-05T15:17:10.107210+08:00",
        "misfire_grace_time": 1,
        "max_instances": 1,
        "next_run_time": "2021-03-05T15:17:10.107210+08:00"
    }
    

    充分利用APScheduler的Event机制

    class CustomAPScheduler(APScheduler):
        # scheduler事件映射本地状态
        STATUS_MAPPING = {
            EVENT_JOB_ADDED: 0,
            EVENT_JOB_MODIFIED: 1,
            EVENT_JOB_SUBMITTED: 2,
            EVENT_JOB_EXECUTED: 3,
            EVENT_JOB_REMOVED: 4,
            EVENT_JOB_ERROR: 5,
            EVENT_JOB_MISSED: 6,
            EVENT_ALL_JOBS_REMOVED: 7,
            EVENT_JOB_MAX_INSTANCES: 8
        }
    
        def __init__(self, session, scheduler=None, app=None):
            super(CustomAPScheduler, self).__init__(scheduler, app)
            self.session = session
    
        def listener_all_job(self, event):
            """
            监控job的生命周期,可视化监控,并且可增加后续的没有触发任务等监控
            添加到线程做处理
            :param event:
            :return:
            """
            job_id = None
            args = []
            if event.code != EVENT_ALL_JOBS_REMOVED:
                job_id = event.job_id
            if job_id:
                jobstore_alias = event.jobstore
                job = self.scheduler.get_job(job_id, jobstore_alias)
                if job:
                    name = job.name
                    func = str(job.func_ref)
                    trigger = job.trigger if isinstance(job.trigger, str) else str(job.trigger).split("[")[0]
                    next_run_time = str(job.next_run_time).split(".")[0]
                else:
                    name = None
                    func = None
                    trigger = None
                    next_run_time = None
                args = [name, func, trigger, next_run_time]
            traceback = event.traceback if hasattr(event, 'traceback') else "",
            args.append(traceback)
            t = threading.Thread(target=self.handle_listener_all_job, args=[event.code, job_id, *args])
            t.start()
            t.join()
    
        def handle_listener_all_job(self, event_type, *args):
            """
            实际处理IO操作
            如何处理一个job_id重复使用的问题,采用本地id自增,如果真有job_id重复的情况,则认为指定的是最后一个job_id对应的任务
            """
            try:
                if event_type == EVENT_JOB_ADDED:
                    # 添加任务定义表
                    job = ApschedulerJobInfo()
                    job.job_id = args[0]
                    job.job_name = args[1]
                    job.job_func = args[2]
                    job.job_trigger = args[3]
                    job.job_next_run_time = args[4]
                    job.job_status = 0
                    self.session.add(job)
                    self.session.flush()
                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                elif event_type == EVENT_JOB_MODIFIED:
                    # 修改job[取数据库表中job_id最后一个进行修改]
                    job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                        ApschedulerJobInfo.job_id == args[0]).first()
                    if job:
                        # 更新JOB表
                        job.job_name = args[1]
                        job.job_func = args[2]
                        job.job_trigger = args[3]
                        job.job_next_run_time = args[4]
                        job.job_status = 0
    
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                    else:
                        LOGGER.warning("指定的job本地不存在{}".format(args))
                elif event_type == EVENT_JOB_SUBMITTED:
                    # 提交job执行
                    job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                        ApschedulerJobInfo.job_id == args[0]).first()
                    if job:
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                    else:
                        LOGGER.warning("指定的job本地不存在{}".format(args))
                elif event_type == EVENT_JOB_EXECUTED:
                    # 执行job
                    job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                        ApschedulerJobInfo.job_id == args[0]).first()
                    if job:
                        # 更新JOB表
                        job.job_status = 1
    
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                    else:
                        LOGGER.warning("指定的job本地不存在{}".format(args))
                elif event_type == EVENT_JOB_REMOVED:
                    # 删除job
                    job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                        ApschedulerJobInfo.job_id == args[0]).first()
                    if job:
                        # 更新JOB表
                        job.job_status = 5
    
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                    else:
                        LOGGER.warning("指定的job本地不存在{}".format(args))
                elif event_type == EVENT_JOB_ERROR:
                    # 执行job出错
                    job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                        ApschedulerJobInfo.job_id == args[0]).first()
                    if job:
                        # 更新JOB表
                        job.job_status = 2
                        job.job_traceback = args[5]
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                    else:
                        LOGGER.warning("指定的job本地不存在{}".format(args))
                elif event_type == EVENT_JOB_MISSED:
                    # job执行错过
                    job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                        ApschedulerJobInfo.job_id == args[0]).first()
                    if job:
                        # 更新JOB表
                        job.job_status = 3
                        job.job_traceback = args[5]
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                    else:
                        LOGGER.warning("指定的job本地不存在{}".format(args))
                elif event_type == EVENT_ALL_JOBS_REMOVED:
                    # 删除所有job
                    all_jobs = ApschedulerJobInfo.query.filter(ApschedulerJobInfo.job_status == 0).all()
                    for job in all_jobs:
                        job.job_status = 6
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                elif event_type == EVENT_JOB_MAX_INSTANCES:
                    # job超过最大实例
                    job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                        ApschedulerJobInfo.job_id == args[0]).first()
                    if job:
                        # 更新JOB表
                        job.job_status = 4
                        job.job_traceback = args[5]
                        # 增加任务事件表
                        job_event = ApschedulerJobEventInfo()
                        job_event.job_info_id = job.id
                        job_event.event = self.STATUS_MAPPING[event_type]
                        self.session.add(job_event)
                        self.session.commit()
                    else:
                        LOGGER.warning("指定的job本地不存在{}".format(args))
            except:
                LOGGER.exception("执行任务异常")
    
        def init_app(self, app):
            super(CustomAPScheduler, self).init_app(app)
    
            # 增加监听函数,监听所有job的生命周期
            self.add_listener(self.listener_all_job,
                              EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES | EVENT_ALL_JOBS_REMOVED | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_SUBMITTED)
    

    收集完成数据后进行展示及管理

    • JOB管理


      JOB管理
    • JOB事件执行明细


      JOB事件执行明细

    关注公众号“战渣渣”,回复“调度”获得源码

    相关文章

      网友评论

        本文标题:APScheduler管理及监控平台

        本文链接:https://www.haomeiwen.com/subject/uzwvqltx.html