美文网首页
Python标准库-concurrent.futures

Python标准库-concurrent.futures

作者: _kkk | 来源:发表于2018-04-07 14:41 被阅读0次

    concurrent.futures模块提供了一个高层面的接口来实现异步调用。

    concurrent,futures提供了两种方式来执行异步调用,一种是借助线程,使用ThreadPoolExecutor对象;另一种是借助进程,使用ProcessPoolExecutor对象,这两种对象实现了相同的接口,这些接口定义在他们的父类Executor中。

    Executor对象

    这是一个抽象类,提供了异步执行的方法。这个类不应该被直接使用,应该使用上面提到的两个具体子类。

    方法介绍

    • submit(fn, *args, **kwargs)

      执行fn(*args, **kwargs)并返回一个Future对象,Future对象在下面会介绍到。

    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(pow, 323, 1235)
        print(future.result())
    
    • map(func, *iterables, timeout=None, chunksize=1)

      与内置的map函数用法相似,除了下面两个区别:

      1. iterables不是懒加载的
      2. func是异步执行的,多个func可以并发执行

      timeout指定一个时间,如果next()被调用了且在timeout时间内没有得到结果则引发concurrent.futures.TimeoutError。如果为None,则不限制时间。

      当使用ProcessPoolExecutor时,设置chunksize的值可以将iterables分块,并一次性发给进程池中的对象,对于很长的迭代对象,使用一个大的chunksize可以提高效率。但是对于ThreadPoolExecutor对象,chunksize没有任何作用。
      我的理解是因为进程之间占用了不同的内存空间,所以不同的进程在执行时需要先从调用Executor的进程复制所需的参数,当chunksize为1时,每执行一次就需要通信一次,显然非常浪费时间,所以设置一个大点的chunksize可以一次性获取多次执行所需的参数,减少通信的次数。而线程因为本来就在同一个内存空间中,不存在这个问题,因此没有影响。

    • shutdown(wait=True)

      释放资源的,通过给每个thread或process执行join()方法实现。

      通过使用with语句可以避免使用这个方法。

    ThreadPoolExecutor(max_workers=None, thread_name_prefix='')

    ThreadPoolExecutor是Executor的子类,通过使用线程池来实现异步调用。

    当一个Future关联的可调用对象等待另一个Future的结果时,会发生死锁,官方例子如下:

    def wait_on_b():
        time.sleep(5)
        print(b.result())  # b will never complete because it is waiting on a.
        return 5
    def wait_on_a():
        time.sleep(5)
        print(a.result())  # a will never complete because it is waiting on b.
        return 6
    executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)
    

    还有一个:

    def wait_on_future():
        f = executor.submit(pow, 5, 2)
        # This will never complete because there is only one worker thread and
        # it is executing this function.
        print(f.result())
    executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(wait_on_future)
    

    ProcessPoolExecutor(max_workers=None, thread_name_prefix='')

    ProcessPoolExecutor同样也是Executor的子类,通过进程池来实现异步调用,且不受全局解释器锁的限制。

    __main__模块必须被工作子进程导入,这意味这ProcessPoolExecutor不会在交互式解释器中起作用。

    同样,与线程中提到的一样,不当的使用会引起死锁,具体例子同上。

    ProcessPoolExecutor例子

    import concurrent.futures
    import math
    
    PRIMES = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419]
    
    def is_prime(n):
        if n % 2 == 0:
            return False
    
        sqrt_n = int(math.floor(math.sqrt(n)))
        for i in range(3, sqrt_n + 1, 2):
            if n % i == 0:
                return False
        return True
    
    def main():
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
                print('%d is prime: %s' % (number, prime))
    
    if __name__ == '__main__':
        main()
    

    可以看出map函数返回的直接就是结果,与submit有些区别。

    Future 对象

    Future类封装了可调用对象的异步执行,Future实例由Executor.submit()创建,不应该被直接创建。

    方法介绍

    • cancel()

      尝试取消调用,如果调用正在执行且无法被取消则返回False,否则调用会被取消并返回True。

    • cancelled()

      调用成功被取消时返回True。

    • running()

      如果调用正在被执行且不能被取消则返回True。

    • done()

      如果调用成功被取消或者执行完成则返回True。

    • result(timeout=None)

      返回调用的返回值。

    • add_done_callback(fn)

      这个方法用来添加回调函数,调用这个方法的future对象会把自己作为唯一参数传给函数fn,无论future是执行完成还是被取消,都会调用fn。

    模块方法

    concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

    返回两个二元组集合,第一个集合名为done,包含已经完成的futures;第二个集合名为not_done,包含没有完成的futures。return_when参数指明了这个方法什么时候返回,一共有三种参数,默认为ALL_COMPLETED.

    1. FIRST_COMPLETED:当任何future完成或被取消时返回
    2. FIRST_EXCEPTION:当任何future引起一个异常时返回,如果没有异常,等效于ALL_COMPLETED
    3. ALL_COMPLETED:当所有futures完成或被取消时返回
    

    concurrent.futures.as_completed(fs, timeout=None)

    返回由fs给出的Future实例迭代器,只有当future完成时才会返回,返回的顺序与完成的顺序相同,最先完成的最先返回,如果fs中包含两个同样的对象,只会返回一次。

    官网例子如下:

    import concurrent.futures
    import urllib.request
    
    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'http://some-made-up-domain.com/']
    
    # Retrieve a single page and report the URL and contents
    def load_url(url, timeout):
        with urllib.request.urlopen(url, timeout=timeout) as conn:
            return conn.read()
    
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                print('%r page is %d bytes' % (url, len(data)))
    

    future_to_url是一个字典,一个可迭代对象。以submit()返回的future作为键,url作为值。执行以后可以发现输出的结果的顺序与传入的url顺序不一定是相等的。

    参考连接

    相关文章

      网友评论

          本文标题:Python标准库-concurrent.futures

          本文链接:https://www.haomeiwen.com/subject/xoryhftx.html