最近工作中有这么一种需求,需要定时将三种任务(假设任务为:A、B、C)分配到10台Windows Server中执行,而且这三种任务中还分有优先级的(为了简单就以每种任务分三种优先级为例吧)。很容易想到这不就是做一个异步调度嘛,找一个有优先级的消息队列就应该可以搞定了。可以后来发现目前Python这边的消息队列竟然主流不支持Windows,如:RQ、高版本的Celery,还有优先级支持也不是很好,于是乎打算自己造一个。
看了一些相关的博客,发现可以用Redis的list结构做队列,对于优先级的支持呢目前我是打算采用这种方式:
每一种任务每一种优先级都单独放一个队列存储(那么三种任务并且每种任务三个优先级别的话就需要9个Redis队列)。
上代码前先简单说明一下实现流程,其实主要就两个模块:入队、出队,说清楚这两块就OK了。
- 入队时,定时任务将A、B、C任务以及它们的优先级别传过来,接着我们对其进行判断,看各些任务进那些队列中(也就是各些任务在Redis队列中的键是什么)。我目前采用这么一种键的组合方式:任务类型-优先级(taskType-level),比如:A类型任务中优先级为1的任务最后进入的Redis队列的键为:A-1,那么优先级为100的B类型任务在Redis队列中的键也就为:B-100。简单弄了一张图,凑合着看吧。
- 到出队了,出队这边其实挺简单,第一种是:如果该Redis的DB下只有我们的任务,那么我们把所有的键取出来即可,取出来后可以对键按优先级排列(像SQL:order by level),或按任务类型和优先级排列(像SQL:order by taskType, level),排列后得到一个键的列表,再根据这个键的列表去pop任务即可。第二种是:我们可以配置某台客户端可执行的任务的类型,比如其中一台电脑我只想让它跑A类型任务。那我只给它配置A,这样让它去模式匹配Redis中的键(A-[0-9]*),这样取出来的就是A类型的所有优先级的任务了,如果想让它跑A、B任务就可以循环匹配嘛。
我也不知道有没有讲清楚这个流程,看代码吧(代码写得丑,萌新请各位大大多指教)
https://github.com/wikizero/MyScripts/blob/master/forWork/MyRedisQueue.py
# coding:utf-8
import redis
import re
import json
import time
from itertools import chain
from datetime import datetime, date
class ExpandJsonEncoder(json.JSONEncoder):
'''
采用json方式序列化传入的任务参数,而原生的json.dumps()方法不支持datetime、date,这里做了扩展
'''
def default(self, obj):
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
else:
return json.JSONEncoder.default(self, obj)
class MyRedisQueue:
def __init__(self):
self.redis_connect = redis.Redis()
def get_len(self, key):
keys = self.get_keys(key)
# 每个键的任务数量
key_len = [(k, self.redis_connect.llen(k)) for k in keys]
# 所有键的任务数量
task_len = sum(dict(key_len).values())
return task_len, key_len
def get_keys(self, key):
# Redis的键支持模式匹配
keys = self.redis_connect.keys(key + '-[0-9]*')
# 按优先级将键降序排序
keys = sorted(keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
return keys
def push_task(self, key, tasks, level=1):
'''
双端队列,左边推进任务
:param level: 优先级(int类型),数值越大优先级越高,默认1
:return: 任务队列任务数量
'''
# 重新定义优先队列的key
new_key = key + '-' + str(level)
# 序列化任务参数
tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]
print 'RedisQueue info > the number of push tasks:', len(tasks)
if not tasks:
return self.get_len(key)
self.redis_connect.lpush(new_key, *tasks)
return self.get_len(key)
def pop_task(self, keys=None, priority=False):
'''
双端队列 右边弹出任务
:param keys: 键列表,默认为None(将获取所有任务的keys)
:return:
'''
while True:
# 避免在while循环中修改参数,将keys参数赋值到临时变量
temp_keys = keys
# 不指定keys,将获取所有任务
if not keys:
temp_keys = self.redis_connect.keys()
temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)]))
# 根据key作为关键字获取所有的键
all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
# 屏蔽任务差异性,只按优先级高到低弹出任务
if priority:
all_keys = sorted(all_keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
if all_keys:
task_key, task = self.redis_connect.brpop(all_keys)
return task_key, json.loads(task)
time.sleep(2)
if __name__ == '__main__':
mrq = MyRedisQueue()
# 把任务推入redis 队列
# lst = [i for i in xrange(0, 40)]
# print mrq.push_task('C', lst, level=4)
# 从redis queue取出任务
# while True:
# task_type, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
# print task_type, task
# time.sleep(1)
# 查看任务数量以及优先级情况
# count, key_len = mrq.get_len('task')
# print key_len
网友评论