上一篇多线程文章提到了Global Interpreter Lock的限制,在某些情况下多线程并不能提高程序的运行效率。那么为了提高并发量,我们可以利用多进程,多进程可以让程序员充分利用机器的处理器,毕竟现在连个人的笔记本都有8核,16核。
再强调一遍,如果能看懂英文文档,Python标准库是你最好的读物!
multiprocessing
multiprocessing是标准库中提供的多进程包,它提供了大多数跟多线程threading模块相似的api,同时也有多线程不具备的api。
在这个包中,Process
类是用来创建一个进程对象,调用对象的start()方法开始运行该进程。
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
可以看出来跟threading.Thread的用法是一样的,忘记了的可以回去看下多线程的类方法介绍。下面介绍几个不一样的方法。
multiprocessing.active_children(): 返回当前进程的所有还在运行的子进程
multiprocessing.cpu_count(): 返回处理器的个数
进程间通信
上一篇多线程文章中没有提到通信问题,因为多线程虽然是并发,但是还是占用了同一片内存,但是多进程不一样,多进程是占用了不同的内存空间,由于同属一个程序,相互之间难免要有信息交换,multiprocessing提供了两种进程间的通信信道。
Queue
多进程包提供了一个Queue类,相当于queue模块的Queue类的一个克隆。我们把之前多线程版的生产者消费者模式改成多进程,通信使用Queue对象。
def consumer(q):
while 1:
print('waiting for data')
data = q.get()
print('i got data: {}'.format(data))
def producer(q):
while 1:
q.put(['time', time.time()])
time.sleep(3)
if __name__ == '__main__':
queue = Queue()
c = Process(target=consumer, args=(queue,))
c.start()
p = Process(target=producer, args=(queue,))
p.start()
c.join()
p.join()
# output:
# waiting for data
# i got data: ['time', 1521341942.320783]
# waiting for data
# i got data: ['time', 1521341945.3241975]
# waiting for data
# i got data: ['time', 1521341948.3273387]
# waiting for data ctrl+c打断输出。
多进程提供了三种队列类,分别为Queue, SimpleQueue和JoinableQueue.区别是在与queue.Queue类相比,多进程的Queue类缺少了task_done()和join()方法。task_done()方法是用于get()方法之后,表明任务完成。
当然多线程中也是可以使用队列来构造一个多生产者多消费者的模型,使用queue.Queue()就行了。
Pipe
管道,作用就像它的名字一样,提供了一个双向通信的功能,但是也能设置成单向通信(构造Pipe对象时传入False参数),构造对象时返回一对(conn1, conn2),当为单向通信时,conn1只能收消息,conn2只能写消息。举个管道的例子。
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
con_1, con_2 = Pipe()
p = Process(target=f, args=(con_2,))
p.start()
print(con_1.recv())
p.join()
同步机制
和多线程一样,多进程的执行顺序也是无法预测的,同样需要一些同步原语,多进程模块包含所有多线程模块的同步原语,API接口也都是一样的,可以借鉴上一篇文章。因为加锁这些操作很繁琐,容易出现死锁的现象,在编程中能够避免尽量避免。
共享对象
同样上面提到了,多进程之间占用的是不同的内存空间,如果实在想要共享一些数据,多进程模块也提供了一些方法。可以使用Value和Array。直接放上官网的例子。
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
进程池 Process Pools
进程的创建与回收是需要浪费CPU的时间的,频繁的申请与销毁是对宝贵CPU资源的一种浪费,线程的创建与销毁浪费的资源比进程少,但也是一种浪费。那么为了减少这样的浪费,有了池的概念。同时池也是为了控制并发的数量,避免因为短时间内的爆发量的进程耗尽系统资源。
多进程库中实现了一个Pool类,创建时可以传入一个整数指定进程的数量,默认为处理器数量。当有工作需要使用进程池时,进程首先检查池子满了没有,如果满了,则等待正在执行的进程的完成,没有则用一个新的进程执行工作。
方法:
- map(func, iterable[, chunksize]): 与map函数类似,但是这是支持并发的。缺陷就是函数只能接受一个可迭代的参数。
- imap(func, iterable[, chunksize]): 一个懒惰版本的map。类似与迭代器与生成器的关系。
还有一些方法不是很了解,就对上面的两个举个例子吧。
def f(x):
print("my name is {}.".format(multiprocessing.current_process().name))
return x*x
pool = Pool(4)
print(pool.map(f, range(10)))
# output:
# my name is ForkPoolWorker-1.
# my name is ForkPoolWorker-3.
# my name is ForkPoolWorker-2.
# my name is ForkPoolWorker-1.
# my name is ForkPoolWorker-4.
# my name is ForkPoolWorker-3.
# my name is ForkPoolWorker-1.
# my name is ForkPoolWorker-2.
# my name is ForkPoolWorker-1.
# my name is ForkPoolWorker-3.
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
result = pool.imap(f, range(10))
print(next(result))
print(next(result))
# output:
# my name is ForkPoolWorker-1.
# my name is ForkPoolWorker-1.
# my name is ForkPoolWorker-1.
# 0
# 1
# my name is ForkPoolWorker-3.
# my name is ForkPoolWorker-2.
可以看出imap似乎是全都执行过了,因为有5个print语句,但是结果是一个懒惰的序列。
网友评论