-
背景
上线的Django网站需要一个后台文件上传后解析的服务,最开始采用的是定时任务来实现的,每5分钟从数据库取一个状态为初始化的文件下载下来解析。这个带来的问题是效率低下(有些文件10秒就解析完了),另外一旦解析逻辑变更导致文件重新解析的话周期太长,无法适应业务的需求 -
新方案
简单实用PYTHON实现一个定长的线程池,把所有的任务以及任务参数放入队列中,重新对任务进行包装(使任务结束后触发开始新任务的逻辑) -
代码实现如下
class ToDoList:
def __init__(self, maxSize):
self.maxSize = maxSize #最大线程数
self.tasks = Queue() #任务队列
self.workList = dict() #工作列表
self.lock = threading.Lock() #线程锁
def start(self, target=None, args=None):
if target is not None:
self.tasks.put((target, args))
self.lock.acquire()
if len(self.workList) < self.maxSize:
if not self.tasks.empty():
task_id = str(uuid.uuid1())
func, paras = self.tasks.get()
task = self._create_task(func, task_id)
task = threading.Thread(target=task, args=paras)
task.start()
self.workList[task_id] = task
self.lock.release()
def _create_task(self, target, taskId):
# 任务装饰器
def wrapper(*args,**kwargs):
try:
result = target(*args,**kwargs)
return result
finally:
del self.workList[taskId]
# 任务结束后重新从队列里取下个任务执行
self.start()
return wrapper
网友评论