美文网首页
使用Redis实现定时任务多节点部署及自动任务分发

使用Redis实现定时任务多节点部署及自动任务分发

作者: 越大大雨天 | 来源:发表于2020-04-01 20:32 被阅读0次

    最近部署一个定时任务项目遇到一个问题:部署的代码每天会定时启动多个任务在后台串行执行,且每个任务耗时比较长。我们的生产环境会自动开启三个相同的节点把部署的代码同步运行,这就导致三个节点的定时任务会重复执行,我只好强行关闭另外两个节点,以单节点的方式串行执行定时任务。

    这样代码是能跑起来,但是效率低且浪费了另外两个服务器。我需要改造代码以便能支持多节点运行,且能让每个节点分发到不同的任务去执行。

    思考过后,我借助redis实现了一个简单的分布式定时任务,在这里记录一下。

    思路

    要实现分布式,必须要使用到每个节点都能访问到的公共变量作为全局锁,这里我使用了redis来实现锁。
    对每个不同类的任务设置唯一的字符串键,当定时任务开始时,每个节点将会去遍历任务抢占锁,只有获取到锁的节点可以执行锁对应的任务,而未获取到锁的节点将继续遍历其他任务锁,直到所有任务锁被获取并执行。

    示例代码:task_manager.py

    from redis import Redis
    from apscheduler.schedulers.blocking import BlockingScheduler
    import time
    import random
    
    
    r = Redis(host='localhost', port=6379, db=0, decode_responses=True)
    
    TASK_LIST = ["任务参数1", "任务参数2", "任务参数3", "任务参数4"]
    
    # 存放当前已完成的集合键
    FINISHED_TASK_KEY = "task:finished"
    # 给每个任务顺序分配不同的redis键
    TASK_KEYS = ["task: {}".format(task) for task in range(len(TASK_LIST))]
    # redis键对应的任务参数
    TASK_MAP = {key: task for key, task in zip(TASK_KEYS, TASK_LIST)}
    EXPIRE = 60 * 60 * 23
    
    # 模拟定时执行的任务,可接受不同的参数
    def task(param):
        print("----------------------------------")
        print("开始执行任务:{}....".format(param))
        time.sleep(random.randint(15, 30))
        print("完成{}".format(param))
        print("----------------------------------")
    
    
    def start():
        # 每日任务开始前先删除redis中已完成任务集合
        r.delete(FINISHED_TASK_KEY)
        # 添加redis中唯一任务锁,若添加成功且任务完成表中无当前任务则执行
        for key in TASK_KEYS:
            status = r.set(key, 1, ex=EXPIRE, nx=True)
            # 检查当前任务是否已执行
            is_finished = r.sismember(FINISHED_TASK_KEY, key)
            if status is True and is_finished is False:
                print("当前节点获取任务:{} 成功, 准备执行。。".format(key))
                # 获取当前redis键对应的任务参数并执行任务
                task_param = TASK_MAP.get(key)
                task(task_param)
                # 任务执行完成将当前任务添加到已完成集合,不允许其他节点再获取该任务
                r.sadd(FINISHED_TASK_KEY, key)
                r.delete(key)
            elif status is True and is_finished is True:
                print("未执行: {},其他节点已完成该任务".format(key))
            else:
                print("未执行:{},其他节点正在执行当前任务".format(key))
    
    
    if __name__ == '__main__':
        # 这里可使用apscheduler定时任务框架调度start函数,该处模拟未使用定时任务
        start()
    
    
    
    

    模拟多节点运行

    先同时打开三个命令行窗口,并键入执行命名,然后尽量相近时间点击回车运行,模拟多节点下定时任务同时触发的场景:


    模拟多节点

    执行情况如下:


    模拟多节点同步运行结果

    根据打印的执行情况看,整体符合预期,一个任务只能被一个节点执行,且能做到自动任务分发,执行完当前任务的节点会自动去获取还未执行的任务,直到所有任务执行完毕。

    最后

    当然,以上只是一个demo,还需要处理很多异常判断任务,任务完成情况之类的,针对不同的任务情景还可以有其他变通方法。这里只是整理一下思路和分享,希望对你也有所启发。

    相关文章

      网友评论

          本文标题:使用Redis实现定时任务多节点部署及自动任务分发

          本文链接:https://www.haomeiwen.com/subject/smxzuhtx.html