并发编程
-
并行与并发的区别
-
并发: 宏观上,多个任务同时运行; 一个CPU核心交替运行多个程序。
-
并行: 同一时刻发生; 多个CPU同时处理多个程序。
-
Python的concurrent库实现并行计算
官方文档:
https://docs.python.org/zh-cn/3/library/concurrent.futures.html#module-concurrent.futures
只有一个包:concurrent.futures
# https://docs.python.org/zh-cn/3/library/concurrency.html
- 模块提供异步执行可调用对象高层接口。
- 支持进程与线程两种模型
线程不能跨CPU,进程可以在CPU间运行
线程:线程池 (TheadPoolExecutor方式),使用线程池实现并发
Executor
# class concurrent.futures.Executor
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
- submit 调度可调用对象fn
- map: 类似于 [
map(func, *iterables)
] 函数,除了以下两点- iterables 是立即执行而不是延迟执行的;
- func 是异步执行的,对 func 的多个调用可以并发执行。
- shutdown(wait=True, *, cancel_futures=False)
当待执行的 future 对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。
关闭后调用 Executor.submit() 和 Executor.map() 将会引发 RuntimeError。
ThreadPoolExecutor
ThreadPoolExecutor 是 Executor 的子类,它使用线程池来异步执行调用。
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
注意:当可调用对象已关联了一个 Future 然后在等待另一个 Future 的结果时就会导致死锁情况。
ProcessPoolExecutor
ProcessPoolExecutor 类是 Executor 的子类,它使用进程池来异步地执行调用。 ProcessPoolExecutor 会使用 multiprocessing 模块,这允许它绕过 全局解释器锁 但也意味着只可以处理和返回可封存的对象。
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
注意:main 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中。从可调用对象中调用 Executor 或 Future 的方法提交给 ProcessPoolExecutor 会导致死锁。
网友评论