基于Redis实现无限级优先级队列(Python代码)

作者: wikizero | 来源:发表于2018-01-13 18:38 被阅读166次

    最近工作中有这么一种需求,需要定时将三种任务(假设任务为:A、B、C)分配到10台Windows Server中执行,而且这三种任务中还分有优先级的(为了简单就以每种任务分三种优先级为例吧)。很容易想到这不就是做一个异步调度嘛,找一个有优先级的消息队列就应该可以搞定了。可以后来发现目前Python这边的消息队列竟然主流不支持Windows,如:RQ、高版本的Celery,还有优先级支持也不是很好,于是乎打算自己造一个。
    看了一些相关的博客,发现可以用Redis的list结构做队列,对于优先级的支持呢目前我是打算采用这种方式:

    每一种任务每一种优先级都单独放一个队列存储(那么三种任务并且每种任务三个优先级别的话就需要9个Redis队列)。

    上代码前先简单说明一下实现流程,其实主要就两个模块:入队、出队,说清楚这两块就OK了。

    1. 入队时,定时任务将A、B、C任务以及它们的优先级别传过来,接着我们对其进行判断,看各些任务进那些队列中(也就是各些任务在Redis队列中的键是什么)。我目前采用这么一种键的组合方式:任务类型-优先级(taskType-level),比如:A类型任务中优先级为1的任务最后进入的Redis队列的键为:A-1,那么优先级为100的B类型任务在Redis队列中的键也就为:B-100。简单弄了一张图,凑合着看吧。
    image.png
    1. 到出队了,出队这边其实挺简单,第一种是:如果该Redis的DB下只有我们的任务,那么我们把所有的键取出来即可,取出来后可以对键按优先级排列(像SQL:order by level),或按任务类型和优先级排列(像SQL:order by taskType, level),排列后得到一个键的列表,再根据这个键的列表去pop任务即可。第二种是:我们可以配置某台客户端可执行的任务的类型,比如其中一台电脑我只想让它跑A类型任务。那我只给它配置A,这样让它去模式匹配Redis中的键(A-[0-9]*),这样取出来的就是A类型的所有优先级的任务了,如果想让它跑A、B任务就可以循环匹配嘛。
      我也不知道有没有讲清楚这个流程,看代码吧(代码写得丑,萌新请各位大大多指教)
      https://github.com/wikizero/MyScripts/blob/master/forWork/MyRedisQueue.py
    # coding:utf-8
    import redis
    import re
    import json
    import time
    from itertools import chain
    from datetime import datetime, date
    
    
    class ExpandJsonEncoder(json.JSONEncoder):
        '''
            采用json方式序列化传入的任务参数,而原生的json.dumps()方法不支持datetime、date,这里做了扩展
        '''
    
        def default(self, obj):
            if isinstance(obj, datetime):
                return obj.strftime('%Y-%m-%d %H:%M:%S')
            elif isinstance(obj, date):
                return obj.strftime('%Y-%m-%d')
            else:
                return json.JSONEncoder.default(self, obj)
    
    
    class MyRedisQueue:
    
        def __init__(self):
            self.redis_connect = redis.Redis()
    
        def get_len(self, key):
            keys = self.get_keys(key)
            # 每个键的任务数量
            key_len = [(k, self.redis_connect.llen(k)) for k in keys]
            # 所有键的任务数量
            task_len = sum(dict(key_len).values())
            return task_len, key_len
    
        def get_keys(self, key):
            # Redis的键支持模式匹配
            keys = self.redis_connect.keys(key + '-[0-9]*')
            # 按优先级将键降序排序
            keys = sorted(keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
            return keys
    
        def push_task(self, key, tasks, level=1):
            '''
            双端队列,左边推进任务
            :param level: 优先级(int类型),数值越大优先级越高,默认1
            :return: 任务队列任务数量
            '''
            # 重新定义优先队列的key
            new_key = key + '-' + str(level)
            # 序列化任务参数
            tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]
    
            print 'RedisQueue info > the number of push tasks:', len(tasks)
    
            if not tasks:
                return self.get_len(key)
    
            self.redis_connect.lpush(new_key, *tasks)
            return self.get_len(key)
    
        def pop_task(self, keys=None, priority=False):
            '''
            双端队列 右边弹出任务
            :param keys: 键列表,默认为None(将获取所有任务的keys)
            :return:
            '''
            while True:
                # 避免在while循环中修改参数,将keys参数赋值到临时变量
                temp_keys = keys
    
                # 不指定keys,将获取所有任务
                if not keys:
                    temp_keys = self.redis_connect.keys()
                    temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)]))
    
                # 根据key作为关键字获取所有的键
                all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
    
                # 屏蔽任务差异性,只按优先级高到低弹出任务
                if priority:
                    all_keys = sorted(all_keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
    
                if all_keys:
                    task_key, task = self.redis_connect.brpop(all_keys)
                    return task_key, json.loads(task)
                time.sleep(2)
    
    
    if __name__ == '__main__':
        mrq = MyRedisQueue()
    
        # 把任务推入redis 队列
        # lst = [i for i in xrange(0, 40)]
        # print mrq.push_task('C', lst, level=4)
    
        # 从redis queue取出任务
        # while True:
        #     task_type, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
        #     print task_type, task
        #     time.sleep(1)
    
        # 查看任务数量以及优先级情况
        # count, key_len = mrq.get_len('task')
        # print key_len
    
    
    

    相关文章

      网友评论

        本文标题:基于Redis实现无限级优先级队列(Python代码)

        本文链接:https://www.haomeiwen.com/subject/zmncoxtx.html