美文网首页python
理解Python的PoolExecutor

理解Python的PoolExecutor

作者: 蒋狗 | 来源:发表于2017-02-28 17:12 被阅读0次

    Demo代码和引用知识点都参考自<a href="http://mp.weixin.qq.com/s/Xdggv8YkhTuuQieLTapqHA">《理解Python并发编程一篇就够了|PoolExecutor篇》--董伟明</a>或作者个人公众号Python之美, 《Python Cookbook》和Python并发编程之线程池/进程池

    ThreadPoolExecutorProcessPoolExecutor分别对threadingmultiprocessing进行了高级抽象,暴露出简单的统一接口。

    通过ProcessPoolExecutor 来做示例。

    主要提供两个方法map()submit()

    map() 方法主要用来针对简化执行相同的方法,如下例:

    # -*- coding: utf-8 -*-
    
    from concurrent.futures import ProcessPoolExecutor
    
    def fib(n, test_arg):
        if n > 30:
            raise Exception('can not > 30, now %s' % n)
        if n <= 2:
            return 1
        return fib(n-1, test_arg) + fib(n-2, test_arg)
    
    def use_map():
        nums = [random.randint(0, 33) for _ in range(0, 10)]
        with ProcessPoolExecutor() as executor:
            try:
                results = executor.map(fib, nums, nums)
                for num, result in zip(nums, results):
                    print('fib(%s) result is %s.' % (num, result))
            except Exception as e:
                print(e)
    

    执行上例,输出如下,当num为30时抛出异常捕获后程序停止运行。

    ...
    fib(19) result is 4181.
    fib(11) result is 89.
    fib(2) result is 1.
    fib(5) result is 5.
    fib(24) result is 46368.
    fib(2) result is 1.
    can not > 30, now 33
    

    使用submit()方法。

    # -*- coding: utf-8 -*-
    
    from concurrent.futures import ProcessPoolExecutor, as_completed
    import random
    
    def fib(n, test_arg):
        if n > 30:
            raise Exception('can not > 30, now %s' % n)
        if n <= 2:
            return 1
        return fib(n-1, test_arg) + fib(n-2, test_arg)
    
    def use_submit():
        nums = [random.randint(0, 33) for _ in range(0, 10)]
        with ProcessPoolExecutor() as executor:
            futures = {executor.submit(fib, n, n): n for n in nums}
            for f in as_completed(futures):
                try:
                    print('fib(%s) result is %s.' % (futures[f], f.result()))
                except Exception as e:
                    print(e)
    

    执行上例,输出如下,可见当抛出异常并捕获后,继续向后输出,并没有向map()一样停止,除了as_completed(),还有wait()等方法。

    fib(3) result is 2.
    fib(15) result is 610.
    can not > 30, now 31
    fib(23) result is 28657.
    fib(1) result is 1.
    can not > 30, now 32
    fib(14) result is 377.
    fib(12) result is 144.
    fib(26) result is 121393.
    fib(29) result is 514229.
    

    try/except的代码块包括as_completed()则不会继续输出,直接停止,暂时未找到原因。

    def use_submit():
        nums = [random.randint(0, 33) for _ in range(0, 10)]
        with ProcessPoolExecutor() as executor:
            futures = {executor.submit(fib, n, n): n for n in nums}
            try:
                for f in as_completed(futures):
                    print('fib(%s) result is %s.' % (futures[f], f.result()))
            except Exception as e:
                print(e)
    

    其他:

    1. map()是根据传入的参数然后顺序输出的,as_completed()是按完成时间输出的,上面的例子不明显,可以参考Python并发编程之线程池/进程池,但都跟max_workers 参数和方法执行时间挂钩。
    import time
    def test_sleep(n):
        time.sleep(n)
        return True
    def use_submit():
        nums = [3, 2, 1, 3]
        with ProcessPoolExecutor(max_workers=3) as executor:
            futures = {executor.submit(test_sleep, n): n for n in nums}
            for f in as_completed(futures):
                try:
                    print('%s result is %s.' % (futures[f], f.result()))
                except Exception as e:
                    print(e)
    def use_map():
        nums = [3, 2, 1, 3]
        with ProcessPoolExecutor(max_workers=3) as executor:
            try:
                results = executor.map(test_sleep, nums)
                for num, result in zip(nums, results):
                    print('%s result is %s.' % (num, result))
            except Exception as e:
                print(e)
    

    use_submit() 输出如下,耗时3+1=4s,且完成一个输出一个,指定max_workers=3,第一个耗时1s的完成后就会执行第四个耗时3s的任务。

    1 result is True.
    2 result is True.
    3 result is True.
    3 result is True.
    

    use_map() 输出如下,同样是耗时3+1=4s,但是是按传入参数顺序输入,因为指定max_workers=3,所以前三个是在耗时3s后一起输出的,第四个在耗时4s后再输出。

    3 result is True.
    2 result is True.
    1 result is True.
    3 result is True.
    
    1. 阅读部分map()源码。
    def map(self, fn, *iterables, timeout=None, chunksize=1):
            """Returns an iterator equivalent to map(fn, iter).
    
            Args:
                fn: A callable that will take as many arguments as there are
                    passed iterables.
                timeout: The maximum number of seconds to wait. If None, then there
                    is no limit on the wait time.
                chunksize: The size of the chunks the iterable will be broken into
                    before being passed to a child process. This argument is only
                    used by ProcessPoolExecutor; it is ignored by
                    ThreadPoolExecutor.
    
            Returns:
                An iterator equivalent to: map(func, *iterables) but the calls may
                be evaluated out-of-order.
    
            Raises:
                TimeoutError: If the entire result iterator could not be generated
                    before the given timeout.
                Exception: If fn(*args) raises for any values.
            """
            if timeout is not None:
                end_time = timeout + time.time()
    
            fs = [self.submit(fn, *args) for args in zip(*iterables)]
    
            # Yield must be hidden in closure so that the futures are submitted
            # before the first iterator value is required.
            def result_iterator():
                try:
                    for future in fs:
                        if timeout is None:
                            yield future.result()
                        else:
                            yield future.result(end_time - time.time())
                finally:
                    for future in fs:
                        future.cancel()
            return result_iterator()
    

    fs存放了submit()后返回的future实例,是按传入的参数顺序排序的,返回了result_iterator()。至于为什么会按max_workers数一组返回输出,暂时不清楚。

    1. as_completed()源码,理解略有困难。
    2. ProcessExecutorPool()的实现:
      process.png

    我们结合源码和上面的数据流分析一下:
    executor.map会创建多个_WorkItem对象(ps. 实际上是执行了多次submit()),每个对象都传入了新创建的一个Future对象。
    把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
    创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:
    从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
    从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。

    1. 简单分析submit()
        def submit(self, fn, *args, **kwargs):
            with self._shutdown_lock:
                if self._broken:
                    raise BrokenProcessPool('A child process terminated '
                        'abruptly, the process pool is not usable anymore')
                if self._shutdown_thread:
                    raise RuntimeError('cannot schedule new futures after shutdown')
    
                f = _base.Future()
                w = _WorkItem(f, fn, args, kwargs)
    
                self._pending_work_items[self._queue_count] = w
                self._work_ids.put(self._queue_count)
                self._queue_count += 1
                # Wake up queue management thread
                self._result_queue.put(None)
    
                self._start_queue_management_thread()
                return f
    
    • 创建Future()实例f,和_WorkItem()实例w
    • _pending_work_items即上述所说的Work Items字典,key为_queue_count,初始化为0;value为w。并将_queue_count添加到_work_ids队列中。
    • Wake up queue management thread,即唤醒上述图中的Local Work Thread
    def _start_queue_management_thread(self):
           # When the executor gets lost, the weakref callback will wake up
           # the queue management thread.
           def weakref_cb(_, q=self._result_queue):
               q.put(None)
           if self._queue_management_thread is None:
               # Start the processes so that their sentinels are known.
               self._adjust_process_count()
               self._queue_management_thread = threading.Thread(
                       target=_queue_management_worker,
                       args=(weakref.ref(self, weakref_cb),
                             self._processes,
                             self._pending_work_items,
                             self._work_ids,
                             self._call_queue,
                             self._result_queue))
               self._queue_management_thread.daemon = True
               self._queue_management_thread.start()
               _threads_queues[self._queue_management_thread] = self._result_queue
    
       def _adjust_process_count(self):
           for _ in range(len(self._processes), self._max_workers):
               p = multiprocessing.Process(
                       target=_process_worker,
                       args=(self._call_queue,
                             self._result_queue))
               p.start()
               self._processes[p.pid] = p
    
    • _adjust_process_count()开启max_wokers个进程,执行_process_worker()
    • 开启_queue_management_thread()线程,即Local Worker Thread。
    • _queue_management_thread()线程中将调用_add_call_item_to_queue()_CallItem置于call_queue,并删除引用等操作,该方法理解有困难。
    def _process_worker(call_queue, result_queue):
       """Evaluates calls from call_queue and places the results in result_queue.
    
       This worker is run in a separate process.
    
       Args:
           call_queue: A multiprocessing.Queue of _CallItems that will be read and
               evaluated by the worker.
           result_queue: A multiprocessing.Queue of _ResultItems that will written
               to by the worker.
           shutdown: A multiprocessing.Event that will be set as a signal to the
               worker that it should exit when call_queue is empty.
       """
       while True:
           call_item = call_queue.get(block=True)
           if call_item is None:
               # Wake up queue management thread
               result_queue.put(os.getpid())
               return
           try:
               r = call_item.fn(*call_item.args, **call_item.kwargs)
           except BaseException as e:
               exc = _ExceptionWithTraceback(e, e.__traceback__)
               result_queue.put(_ResultItem(call_item.work_id, exception=exc))
           else:
               result_queue.put(_ResultItem(call_item.work_id,
                                            result=r))
    
    • 执行任务进程,从call_queue中获取_CallItem并调用其fn,将结果放进result_queue中。

    相关文章

      网友评论

        本文标题:理解Python的PoolExecutor

        本文链接:https://www.haomeiwen.com/subject/thbwwttx.html