from concurrent.futures import ThreadPoolExecutor
from threading import Lock
class MRTaskManager(object):
def __init__(self, max_running, max_waiting):
self._lock = Lock()
self._num = 0
self._max_waiting = max_waiting
self._max_running = max_running
self._pool = ThreadPoolExecutor(max_workers=max_running)
def add(self, job):
if self._num >= self._max_waiting:
return
try:
self._lock.acquire()
if self._num >= self._max_waiting:
logger.info('job pool already reach max job count, abort job {}'.format(job))
return
self._num += 1
self._pool.submit(job.run).add_done_callback(self.callback)
logger.info('submit async mapreduce task, {}'.format(job))
except Exception as e:
logger.error("task register failed, err msg = {}".format(e.message))
finally:
self._lock.release()
def callback(self, _):
self._lock.acquire()
self._num -= 1
self._lock.release()
当线程池中任务超过max_waiting的时候,可以丢弃掉任务,防止在锁上排队的任务太多而挤爆内存?
网友评论