美文网首页大数据 爬虫Python AI SqlPython小哥哥
Python线程池ThreadPoolExecutor源码分析!

Python线程池ThreadPoolExecutor源码分析!

作者: 14e61d025165 | 来源:发表于2019-07-16 15:31 被阅读0次

    先看个例子:

    import time
    from concurrent.futures import ThreadPoolExecutor
    def foo():
    print('enter at {} ...'.format(time.strftime('%X')))
    time.sleep(5)
    print('exit at {} ...'.format(time.strftime('%X')))
    executor = ThreadPoolExecutor()
    executor.submit(foo)
    executor.shutdown()
    执行结果:

    Python资源共享群:484031800

    enter at 16:20:31 ...
    exit at 16:20:36 ...
    shutdown(wait=True) 方法默认阻塞当前线程,等待子线程执行完毕。即使 shutdown(wait=Fasle)也只是非阻塞的关闭线程池,线程池中正在执行任务的子线程并不会被马上停止,而是会继续执行直到执行完毕。尝试在源码中给新开启的子线程调用t.join(0)来立马强制停止子线程t,也不行,到底是什么原因保证了线程池中的线程在关闭线程池时,线程池中正在执行任务的子线程们不会被关闭呢?

    看一下ThreadPoolExecutor源码:

    class ThreadPoolExecutor(_base.Executor):
    def init(self, max_workers=None, thread_name_prefix=''):
    """Initializes a new ThreadPoolExecutor instance.
    Args:
    max_workers: The maximum number of threads that can be used to
    execute the given calls.
    thread_name_prefix: An optional name prefix to give our threads.
    """
    if max_workers is None:
    # Use this number because ThreadPoolExecutor is often
    # used to overlap I/O instead of CPU work.
    max_workers = (os.cpu_count() or 1) * 5
    if max_workers <= 0:
    raise ValueError("max_workers must be greater than 0")
    self._max_workers = max_workers
    self._work_queue = queue.Queue()
    self._threads = set()
    self._shutdown = False
    self._shutdown_lock = threading.Lock()
    self._thread_name_prefix = thread_name_prefix
    def submit(self, fn, *args, **kwargs):
    with self._shutdown_lock:
    if self._shutdown:
    raise RuntimeError('cannot schedule new futures after shutdown')
    f = _base.Future()
    # 把目标函数f包装成worker对象,执行worker.run()会调用f()
    w = _WorkItem(f, fn, args, kwargs)
    # 把worker对象放入到队列中
    self._work_queue.put(w)
    # 开启一个新的线程不断的从queue中获取worker对象,获取到则调用worker.run()
    self._adjust_thread_count()
    return f
    submit.doc = _base.Executor.submit.doc
    def adjust_thread_count(self):
    # 当执行del executor时,这个回调方法会被调用,也就是说当executor对象被垃圾回收时调用
    def weakref_cb(
    , q=self._work_queue):
    q.put(None)
    num_threads = len(self._threads)
    if num_threads < self.max_workers:
    thread_name = '%s
    %d' % (self._thread_name_prefix or self,
    num_threads)
    # 把_worker函数作为新线程的执行函数
    t = threading.Thread(name=thread_name, target=_worker,
    args=(weakref.ref(self, weakref_cb),
    self._work_queue))
    t.daemon = True
    t.start()
    self._threads.add(t)
    # 这一步很重要,是确保该线程t不被t.join(0)强制中断的关键。具体查看_python_exit函数
    _threads_queues[t] = self._work_queue
    def shutdown(self, wait=True):
    with self._shutdown_lock:
    self._shutdown = True
    self._work_queue.put(None)
    if wait:
    for t in self._threads:
    t.join()
    shutdown.doc = _base.Executor.shutdown.doc
    submit(func) 干了两件事:

    把worker放入queue中
    开启一个新线程不断从queue中取出woker,执行woker.run(),即执行func()
    _adjust_thread_count()干了两件事:

    开启一个新线程执行_worker函数,这个函数的作用就是不断去queue中取出worker, 执行woker.run(),即执行func()
    把新线程跟队列queue绑定,防止线程被join(0)强制中断。
    来看一下_worker函数源码:

    def _worker(executor_reference, work_queue):
    try:
    while True:
    # 不断从queue中取出worker对象
    work_item = work_queue.get(block=True)
    if work_item is not None:
    # 执行func()
    work_item.run()
    # Delete references to object. See issue16284
    del work_item
    continue
    # 从弱引用对象中返回executor
    executor = executor_reference()
    # Exit if:
    # - The interpreter is shutting down OR
    # - The executor that owns the worker has been collected OR
    # - The executor that owns the worker has been shutdown.
    # 当executor执行shutdown()方法时executor._shutdown为True,同时会放入None到队列,
    # 当work_item.run()执行完毕时,又会进入到下一轮循环从queue中获取worker对象,但是
    # 由于shutdown()放入了None到queue,因此取出的对象是None,从而判断这里的if条件分支,
    # 发现executor._shutdown是True,又放入一个None到queue中,是来通知其他线程跳出while循环的
    # shutdown()中的添加None到队列是用来结束线程池中的某一个线程的,这个if分支中的添加None
    # 队列是用来通知其他线程中的某一个线程结束的,这样连锁反应使得所有线程执行完func中的逻辑后都会结束
    if _shutdown or executor is None or executor._shutdown:
    # Notice other workers
    work_queue.put(None)
    return
    del executor
    except BaseException:
    _base.LOGGER.critical('Exception in worker', exc_info=True)
    可以看出,这个 _worker方法的作用就是在新新线程中不断获得queue中的worker对象,执行worker.run()方法,执行完毕后通过放入None到queue队列的方式来通知其他线程结束。

    再来看看_adjust_thread_count()方法中的_threads_queues[t] = self._work_queue这个操作是如何实现防止join(0)的操作强制停止正在执行的线程的。

    import atexit
    _threads_queues = weakref.WeakKeyDictionary()
    _shutdown = False
    def _python_exit():
    global _shutdown
    _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
    q.put(None)
    # 取出_threads_queues中的线程t,执行t.join()强制等待子线程完成
    for t, q in items:
    t.join()
    atexit.register(_python_exit)
    这个atexit模块的作用是用来注册一个函数,当MainThread中的逻辑执行完毕时,会执行注册的这个_python_exit函数。然后执行_python_exit中的逻辑,也就是说t.join()会被执行,强制阻塞。这里好奇,既然是在MainThread结束后执行,那这个t.join()是在什么线程中被执行的呢。其实是一个叫_DummyThread线程的虚拟线程中执行的。

    import atexit
    import threading
    import weakref
    import time
    threads_queues = weakref.WeakKeyDictionary()
    def foo():
    print('enter at {} ...'.format(time.strftime('%X')))
    time.sleep(5)
    print('exit at {} ...'.format(time.strftime('%X')))
    def _python_exit():
    items = list(threads_queues.items())
    print('current thread in _python_exit --> ', threading.current_thread())
    for t, _ in items:
    t.join()
    atexit.register(_python_exit)
    if name == 'main':
    t = threading.Thread(target=foo)
    t.setDaemon(True)
    t.start()
    threads_queues[t] = foo
    print(time.strftime('%X'))
    t.join(timeout=2)
    print(time.strftime('%X'))
    t.join(timeout=2)
    print(time.strftime('%X'))
    print('current thread in main -->', threading.current_thread())
    print(threading.current_thread(), 'end')
    执行结果:

    enter at 17:13:44 ...
    17:13:44
    17:13:46
    17:13:48
    current thread in main --> <_MainThread(MainThread, started 12688)>
    <_MainThread(MainThread, started 12688)> end
    current thread in _python_exit --> <_DummyThread(Dummy-2, started daemon 12688)>
    exit at 17:13:49 ...
    从这个例子可以看到,当线程t开启时foo函数阻塞5秒,在MainThread中2次调用t.join(timeout=2),分别的等待了2秒,总等待时间是4秒,但是当执行第二个t.join(timeout=2)后,线程t依然没有被强制停止,然后主线执行完毕,然后_python_exit方法被调用,在_DummyThread线程中由调用t.join(),继续等待子线程t的执行完毕,直到线程t打印 exit at 17:13:49 ... 才执行完毕。

    总结:

    join()是可以被一个线程多次调用的,相当是多次等待的叠加。把_python_exit函数注册到atexit模块后,其他线程即使企图调用t.jion(n)来终止线程t也不起作用,因为_python_exit总是在最后执行时调用t.jion()来保证让线程t执行完毕,而不是被中途强制停止。

    相关文章

      网友评论

        本文标题:Python线程池ThreadPoolExecutor源码分析!

        本文链接:https://www.haomeiwen.com/subject/ksrtlctx.html