我们都知道python因为其GIL锁导致每一个线程被绑定到一个核上,导致python无法通过线程实现真正的平行计算。从而导致大量的核算力的浪费。但是
concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。
但是在提高python的计算性能前,首先要明白自己的程序目前是什么类型?
对于不同类型的程序,如果安装下述方法进行改造,可能效率并不会提高。
IO密集型:读取文件,读取网络套接字频繁。
计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。
IO密集型
可以使用asyncio 来进行优化,jit的原理是编译为机器码执行,但是io中可能会存在异常字符,所以也不推荐使用,当然在存在多态主机的情况下,可以采用分布式编程来提高效率,或者过concurrent.futures模块来实现。
详情请看下一篇博文
计算密集型
当然我们可以使用jit,分布式编程,python 调用c编程来优化性能,但是要充分利用计算机的核数,可以通过concurrent.futures模块来实现,其在实现提高并行计算能力时时通过多进程实现。
该concurrent.futures模块提供了一个用于异步执行callables的高级接口。
可以使用线程,使用ThreadPoolExecutor或单独的进程 来执行异步执行 ProcessPoolExecutor。两者都实现相同的接口,由抽象Executor类定义。
concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
numbers = [
(1963309, 2265973), (1879675, 2493670), (2030677, 3814172),
(1551645, 2229620), (1988912, 4736670), (2198964, 7876293)
]
import time
start = time.time()
results = list(map(gcd, numbers))
end = time.time()
print 'Took %.3f seconds.' % (end - start)
Took 2.507 seconds.
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
start = time.time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time.time()
print 'Took %.3f seconds.' % (end - start)
Took 1.861 seconds.
在两个CPU核心的机器上运行多进程程序,比其他两个版本都快。这是因为,ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,完成下列操作:
1)把numbers列表中的每一项输入数据都传给map。
2)用pickle模块对数据进行序列化,将其变成二进制形式。
3)通过本地套接字,将序列化之后的数据从煮解释器所在的进程,发送到子解释器所在的进程。
4)在子进程中,用pickle对二进制数据进行反序列化,将其还原成python对象。
5)引入包含gcd函数的python模块。
6)各个子进程并行的对各自的输入数据进行计算。
7)对运行的结果进行序列化操作,将其转变成字节。
8)将这些字节通过socket复制到主进程之中。
9)主进程对这些字节执行反序列化操作,将其还原成python对象。
10)最后,把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。
multiprocessing开销比较大,原因就在于:主进程和子进程之间通信,必须进行序列化和反序列化的操作。
submit(fn,* args,** kwargs )
将可调用的fn调度为执行, 并返回表示可调用执行的对象。
fn(*args **kwargs)Future
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
map(func,* iterables,timeout = None,chunksize = 1 )
与以下类似:map(func, *iterables)
在iterables收集立即而不是懒洋洋地;
func以异步方式执行,并且可以同时对func进行多次调用 。
返回的迭代器引发一个concurrent.futures.TimeoutError if next()被调用,并且在从原始调用到超时秒后结果不可用Executor.map()。 timeout可以是int或float。如果未指定 超时None,则等待时间没有限制。
如果func调用引发异常,则在从迭代器检索其值时将引发该异常。
使用时ProcessPoolExecutor,此方法将iterables切割 为多个块,并将其作为单独的任务提交给池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。对于很长的iterables,采用大值CHUNKSIZE可以显著改善性能相比的1.默认大小 ThreadPoolExecutor,CHUNKSIZE没有效果。
在3.5版中更改:添加了chunksize参数。
shutdown(wait = True )
向执行者发出信号,表示当目前待处理的期货执行完毕时,它应该释放它正在使用的任何资源。关机后拨打电话Executor.submit()和拨打电话 Executor.map()将会提出RuntimeError。
如果等待是True那么这种方法将不会返回,直到所有悬而未决的期货执行完毕,并与执行相关的资源已被释放。如果等待,False那么此方法将立即返回,并且当执行所有未决期货时,将释放与执行程序关联的资源。无论wait的值如何,整个Python程序都不会退出,直到所有待处理的期货都执行完毕。
如果使用with语句,则可以避免必须显式调用此方法 ,该语句将关闭Executor (等待,就像Executor.shutdown()使用wait set 调用一样True):
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
网友评论