美文网首页
分布式、多节点系统下定时任务重复执行问题解决方案

分布式、多节点系统下定时任务重复执行问题解决方案

作者: 越大大雨天 | 来源:发表于2020-06-11 19:07 被阅读0次

    在分布式多节点系统下,或者是使用gunicorn等工具启用多个worker的情况下,如何保证后端的定时任务、初始化任务只执行一次呢?比如使用apscheduler或者flask-apscheduler实现的定时任务。

    在这种情况下必须借助外部数据库才能实现,当然,不仅仅只能是Redis,你也可以利用当前系统下有的MySQL、或者MongoDB数据库,只需要自定义一张表,创建一个unique字段作为锁即可。

    我这里将使用python语言,以MySQL为例,使用sqlalchemy+pymysql作为数据库操作方式,使用装饰器的方式对原有任务函数进行改造,以达到对分布式的支持。你可以将该方法扩散到其他语言、其他后端框架或者仅仅是定时任务后台的情况。

    一、创建带唯一值字段的数据表

    无论是redis、mysql、mongodb,要实现一个锁的功能,作为锁的字段必须为唯一值字段

    我这里使用了sqlalchemy创建表、指定lock_key字段unique=True,并定义了add_lockdelete_lock方法,当然你也可以手动在数据库创建表,使用pymysql原生SQL语句实现锁方法。
    ![示例结构]

    image.png
    • 若需要使用原生SQL语句定义表,可参照如下方式:
    CREATE TABLE `task_lock` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `lock_key` varchar(20) COLLATE utf8mb4_bin NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `lock_key` (`lock_key`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
    
    • SQLAlchemy代码示例
      sql_model.py
    from sqlalchemy import MetaData, Table, Column, BigInteger, String, create_engine
    from sqlalchemy import insert, delete
    from sqlalchemy.exc import IntegrityError
    
    
    metadata = MetaData()
    
    engine = create_engine("mysql+pymysql://root:mypassword@127.0.0.1:3306/test?charset=utf8mb4")
    
    # 定义唯一值字段的任务锁表
    locks = Table("taskLock", metadata,
                  Column("id", BigInteger(), primary_key=True, autoincrement=True),
                  Column("lock_key", String(50), unique=True, nullable=False, comment="任务锁")
            )
    
    # 创建表
    metadata.create_all(engine)
    
    # 加锁方法,bool值表示是否成功
    def add_lock(lock_value):
        """添加唯一锁"""
        ins = insert(locks).values(lock_key=lock_value)
        # 若当前锁值不存在,则可以插入成功,返回True
        try:
            engine.connect().execute(ins)
            return True
        # 若当前插入锁值已存在,则会触发并捕获该异常,返回False
        except IntegrityError:
            return False
    
    # 删除锁方法,只要成功添加了锁,任务执行后,无论成功还是失败都必须调用删除方法
    def delete_lock(lock_value):
        """删除锁"""
        d = delete(locks).where(locks.c.lock_key == lock_value)
        engine.connect().execute(d)
    
    

    二、定义单节点任务装饰器

    为什么使用装饰器?使用装饰器的方式对原有函数进行改造,可保留原始函数代码不变,且复用性、可读性更高。建议大家多多使用装饰器哦,这里的装饰器需要传参,所以需要额外增加一层用来接收参数,关于装饰器的学习,可以参考其他文档。

    decorators.py

    
    from sql_models import add_lock, delete_lock
    
    
    # 单节点任务装饰器,被装饰的任务在分布式多节点下同一时间只能运行一次
    def single_task(task):
        def wrap(func):
            def inner(*args, **kwargs):
                add_result = add_lock(task)
                if add_result:
                    print("当前节点获取任务:{}!".format(task))
                    try:
                        result = func(*args, **kwargs)
                        return result
                    except Exception as e:
                        raise e
                    finally:
                        delete_lock(task)
    
                else:
                    print("当前节点未获取任务:{}".format(task))
                    return
            return inner
        return wrap
    
    

    该装饰器的功能很简单,可接收一个任务名,该任务名将用作数据库中lock_key的唯一值写入。在执行原函数前,会先尝试加锁,即写入lock_key值,若写入成功,则获得锁,可以继续执行该任务函数;若加锁失败,即写入lock_key时数据库已存在当前值,说明其他节点正在执行该任务,则无法获得锁,不能执行该任务函数,只会打印提示信息。

    三、装饰需单节点运行的任务

    需要被装饰的任务一般是定时任务,或者是初始化时可能重复运行任务。

    tasks.py
    在这里定义了3个模拟任务task1、task2、task3,并只对task1与task3使用single_task装饰器进行单节点运行装饰:

    import time
    import random
    
    from decorators import single_task
    from apscheduler.schedulers.background import BlockingScheduler
    
    
    @single_task("task1")
    def task1(arg1, arg2):
        print("----------------------------------------------")
        print("开始执行task1")
        time.sleep(random.randint(1, 5))
        print("task1执行完成")
        print("----------------------------------------------" + "\n")
        return arg1 + arg2
    
    
    def task2(arg1, arg2):
        print("----------------------------------------------")
        print("开始执行task2")
        time.sleep(random.randint(1, 5))
        print("task2执行完成")
        print("----------------------------------------------" + "\n")
        return arg1 * arg2
    
    
    @single_task("task3")
    def task3(arg1, arg2):
        print("----------------------------------------------")
        print("开始执行task2")
        time.sleep(random.randint(1, 5))
        print("task2执行完成")
        print("----------------------------------------------" + "\n")
        return arg1 / arg2
    

    四、使用apscheduler开启定时任务

    tasks.py

    配置定时任务:

    import time
    import random
    
    from decorators import single_task
    from apscheduler.schedulers.background import BlockingScheduler
    
    
    @single_task("task1")
    def task1(arg1, arg2):
        print("----------------------------------------------")
        print("开始执行task1")
        time.sleep(random.randint(1, 5))
        print("task1执行完成")
        print("----------------------------------------------" + "\n")
        return arg1 + arg2
    
    
    def task2(arg1, arg2):
        print("----------------------------------------------")
        print("开始执行task2")
        time.sleep(random.randint(1, 5))
        print("task2执行完成")
        print("----------------------------------------------" + "\n")
        return arg1 * arg2
    
    
    @single_task("task3")
    def task3(arg1, arg2):
        print("----------------------------------------------")
        print("开始执行task2")
        time.sleep(random.randint(1, 5))
        print("task2执行完成")
        print("----------------------------------------------" + "\n")
        return arg1 / arg2
    
    
    if __name__ == '__main__':
        print("开始执行定时任务")
        scheduler = BlockingScheduler()
        scheduler.add_job(task1, args=(5, 5), trigger="interval", seconds=20)
        scheduler.add_job(task2, args=(5, 5), trigger="interval", seconds=20)
        scheduler.add_job(task3, args=(5, 5), trigger="interval", seconds=20)
        scheduler.start()
    
    
    

    五、任务演示

    此处简单模拟多个命令行窗口,几乎同时运行,模拟多节点的情况:


    image.png

    可以发现实现了我们的设计,在两个节点中,task1和task3只会运行一次,而未加装饰器的task2不受限制,每个节点中都会重复运行。

    多节点,比如存在写入数据库或者文件操作的定时任务,则设置单节点运行是非常有必要的。

    总结

    同理,你可以使用相同的方式,利用MongDB或其他数据库实现单节点锁的装饰器,若有Redis则实现起来就更加容易和普遍了。

    以上,希望文章能对你有所帮助。

    相关文章

      网友评论

          本文标题:分布式、多节点系统下定时任务重复执行问题解决方案

          本文链接:https://www.haomeiwen.com/subject/jbvbtktx.html