异步任务队列
_task_queue = Queue.Queue()
异步队列输入
def async_call(function, callback, task_id, task_name, *args, **kwargs):
_task_queue.put({
'function': function,
'callback': callback,
'task_id': task_id,
'task_name': task_name,
'args': args,
'kwargs': kwargs
})
异步队列获取并执行
def _task_queue_consumer(**kwargs):
"""
异步任务队列消费者
"""
db = kwargs.get("db_queue")
while True:
try:
#循环间隔获取,避免no-block或者一直获取无法停止
task = _task_queue.get(timeout=10)
func = task.get('function')
task_id = task.get('task_id')
task_name = task.get('task_name')
task_args = task.get('args')
task_kwargs = task.get('kwargs')
try:
result_code, result_value = func(*task_args, **task_kwargs)
except Exception as ex:
#数据库记录任务
record_task(db, task_name, task_id, result=ex, state=3)
else:
if result_code == 200:
record_task(db, task_name, task_id, result=result_value, state=1)
else:
record_task(db, task_name, task_id, result=result_value, state=2)
finally:
_task_queue.task_done()
except Queue.Empty:
#状态位,通过视图函数实现线程间通信,关闭时置位
if not work_state.running_state():
break
except Exception:
log.error("Queue Error!", exc_info=True)
创建线程开始执行方法并返回进程号
def work_queue(*args, **kwargs):
t = threading.Thread(target=_task_queue_consumer, args=args, kwargs=kwargs)
t.start()
_task_queue.join()
return t
主函数与flask main一起执行
if __name__ == '__main__':
work_queue(db_queue=db_queue)
app.run(host="0.0.0.0", port=8989, debug=True, threaded=True)
优势:
1.单任务后台执行,完全不阻塞前台运行
2.任务获取可以伴随自定义的回调函数操作(上面的是数据库操作)
缺点:
1.单线程局限性,没有多并发功能
网友评论