美文网首页
Python标准库-concurrent.futures

Python标准库-concurrent.futures

作者: _kkk | 来源:发表于2018-04-07 14:41 被阅读0次

concurrent.futures模块提供了一个高层面的接口来实现异步调用。

concurrent,futures提供了两种方式来执行异步调用,一种是借助线程,使用ThreadPoolExecutor对象;另一种是借助进程,使用ProcessPoolExecutor对象,这两种对象实现了相同的接口,这些接口定义在他们的父类Executor中。

Executor对象

这是一个抽象类,提供了异步执行的方法。这个类不应该被直接使用,应该使用上面提到的两个具体子类。

方法介绍

  • submit(fn, *args, **kwargs)

    执行fn(*args, **kwargs)并返回一个Future对象,Future对象在下面会介绍到。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
  • map(func, *iterables, timeout=None, chunksize=1)

    与内置的map函数用法相似,除了下面两个区别:

    1. iterables不是懒加载的
    2. func是异步执行的,多个func可以并发执行

    timeout指定一个时间,如果next()被调用了且在timeout时间内没有得到结果则引发concurrent.futures.TimeoutError。如果为None,则不限制时间。

    当使用ProcessPoolExecutor时,设置chunksize的值可以将iterables分块,并一次性发给进程池中的对象,对于很长的迭代对象,使用一个大的chunksize可以提高效率。但是对于ThreadPoolExecutor对象,chunksize没有任何作用。
    我的理解是因为进程之间占用了不同的内存空间,所以不同的进程在执行时需要先从调用Executor的进程复制所需的参数,当chunksize为1时,每执行一次就需要通信一次,显然非常浪费时间,所以设置一个大点的chunksize可以一次性获取多次执行所需的参数,减少通信的次数。而线程因为本来就在同一个内存空间中,不存在这个问题,因此没有影响。

  • shutdown(wait=True)

    释放资源的,通过给每个thread或process执行join()方法实现。

    通过使用with语句可以避免使用这个方法。

ThreadPoolExecutor(max_workers=None, thread_name_prefix='')

ThreadPoolExecutor是Executor的子类,通过使用线程池来实现异步调用。

当一个Future关联的可调用对象等待另一个Future的结果时,会发生死锁,官方例子如下:

def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5
def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

还有一个:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

ProcessPoolExecutor(max_workers=None, thread_name_prefix='')

ProcessPoolExecutor同样也是Executor的子类,通过进程池来实现异步调用,且不受全局解释器锁的限制。

__main__模块必须被工作子进程导入,这意味这ProcessPoolExecutor不会在交互式解释器中起作用。

同样,与线程中提到的一样,不当的使用会引起死锁,具体例子同上。

ProcessPoolExecutor例子

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

可以看出map函数返回的直接就是结果,与submit有些区别。

Future 对象

Future类封装了可调用对象的异步执行,Future实例由Executor.submit()创建,不应该被直接创建。

方法介绍

  • cancel()

    尝试取消调用,如果调用正在执行且无法被取消则返回False,否则调用会被取消并返回True。

  • cancelled()

    调用成功被取消时返回True。

  • running()

    如果调用正在被执行且不能被取消则返回True。

  • done()

    如果调用成功被取消或者执行完成则返回True。

  • result(timeout=None)

    返回调用的返回值。

  • add_done_callback(fn)

    这个方法用来添加回调函数,调用这个方法的future对象会把自己作为唯一参数传给函数fn,无论future是执行完成还是被取消,都会调用fn。

模块方法

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

返回两个二元组集合,第一个集合名为done,包含已经完成的futures;第二个集合名为not_done,包含没有完成的futures。return_when参数指明了这个方法什么时候返回,一共有三种参数,默认为ALL_COMPLETED.

1. FIRST_COMPLETED:当任何future完成或被取消时返回
2. FIRST_EXCEPTION:当任何future引起一个异常时返回,如果没有异常,等效于ALL_COMPLETED
3. ALL_COMPLETED:当所有futures完成或被取消时返回

concurrent.futures.as_completed(fs, timeout=None)

返回由fs给出的Future实例迭代器,只有当future完成时才会返回,返回的顺序与完成的顺序相同,最先完成的最先返回,如果fs中包含两个同样的对象,只会返回一次。

官网例子如下:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

future_to_url是一个字典,一个可迭代对象。以submit()返回的future作为键,url作为值。执行以后可以发现输出的结果的顺序与传入的url顺序不一定是相等的。

参考连接

相关文章

网友评论

      本文标题:Python标准库-concurrent.futures

      本文链接:https://www.haomeiwen.com/subject/xoryhftx.html