Python的多进程和消息队列
对于Python这种动态语言而言,多进程是最快提升效率的办法。对于绝大部分项目,实质上多进程加上协程操作已经可以满足常规的并发需求了。由此构建的web服务器也可以满足大部分的需求。我们首先来看多进程和消息队列的示例。
from multiprocessing import Process, Queue
def test1(que):
print('我是子进程1')
testmsg = '我是在《子进程1》内存空间中的信息'
que.put(testmsg)
def test2(que):
print('我是子进程2')
testmsg = '我是在《子进程2》内存空间中的信息'
que.put(testmsg)
def main(que):
print('主进程开始')
p1 = Process(target = test1 ,args = (que,))
p2 = Process(target= test2, args=(que,))
p1.start()
testmsg = que.get()
print(testmsg)
p2.start()
testmsg= que.get()
print(testmsg)
p1.join()
p2.join()
print('主进程结束')
if __name__ == '__main__':
que = Queue()
main(que)
多进程使用队列通讯在程序\text{\ref{multiprocess_queue}}中,我们示例了一个最简单的多进程(3进程)的通讯过程。注意在test子进程和main主进程中都有队列que作为参量。而消息队列的方法一般只用到两个,就是put和get。put是将内容推入消息队列,get则是取出。实现多进程实例需要使用到multiprocessing 这个模块中的Process类。它会将函数(也可以是对象)和参量作为target 和 args 糅合成一个进程对象。然后用start()方法来启动进程。另一个值得注意的是join()方法,它实际上是将子进程包含在主进程中的方法。如果不使用join()方法,那么各个进程之间就是独立的而且会乱七八糟输出。如果你希望主进程等待子进程结束后才结束,必须使用join()方法。
上面的多进程适合简单无需进程控制的任务,因为每个进程在运行的是一个函数。这样的编程范例往往是把耗时较久的任务单独开一个进程来执行,中间一般只需要将任务执行情况向主进程通过消息队列回报。但绝大多数时候,我们需要对进程进行高级控制。比如读取进程id,挂起进程(使用psutil包的suspend方法),恢复进程(使用psutil包的resume方法)。此时用函数来自己写进程的控制就很费时费力。另一方面,不同进程往往对应着不同的功能,所以用类能够使得编程更清晰。所以更一般的多进程编程范例是将子进程写成进程类。但如果所有的进程都做同一件事,实际上更扁平的设计是使用函数编程。具体还是看需求来设计。
from multiprocessing import Process, Queue
import os, psutil, time
class SubProcess(Process):
def __init__(self, que):
Process.__init__(self)
self.que = que
def run(self):
print('我是子进程1, 进程id为:%s'%str(os.getpid()))
p = psutil.Process()#留空表示当前进程
self.que.put({'process_id': os.getpid(), "process_num":1})
# 推送进程id
for i in range(10):
print(i)
self.que.put(i)
time.sleep(1)
self.que.put({"create_time": p.create_time()})
if __name__=="__main__":
print('主进程开始')
que1 = Queue()
p1 = SubProcess(que1)
p1.start()
sub_pid = que1.get()['process_id']# 子进程id
p_sub = psutil.Process(sub_pid)
for j in range(10):
i_value = que1.get()
print('子程序%s运行到了第%d个步骤'%(sub_pid, i_value+1))
p1.join()
subproc_ctime = que1.get()
print("子进程%s"%(str(sub_pid)) + '运行了%f秒'
%(time.time()-subproc_ctime['create_time']) +'\n 已完成工作~' )
print('主进程结束')
有时候我们需要用进程池来快速建立多个进程。比如如果我们使用的网站服务器是Apache,那么对于多个连接传输请求,往往是建立多个进程来实现并发。注意,这种并发没有上下文关联,更不存在递归调用。所以特别适合处理独立连接的并发。
进程池提供了阻塞和非阻塞的两种方式。
阻塞方式看上去好像没有用,因为阻塞会完全使用不了多进程并发。而且阻塞完了还会需要进程切换,这看似是低效率的。但实际上,并非如此。因为我们往往在主进程进行调度操作,而子进程进行费时的数据处理操作。所以把任务合理分配为调度任务和数据处理任务会大大加速程序的运行效率。即,我们在主进程上分配大量琐碎的调度和轻量级的数据处理。而在子进程上分配重量级的数据处理问题。甚至可以建立一个次主进程专门负责调度,而其他进程则负责数据处理或者通讯。
from multiprocessing import Pool
import os,time
def task(param):
start_time = time.time()
sum = 0
for i in range((param-1)*20,param*20):
sum += i
time.sleep(0.2)
proc_time= time.time()-start_time
print("进程%d执行了%.2f秒,结果是%d"%(os.getpid(),proc_time,sum))
if __name__ == "__main__":
print("主进程开始......")
p = Pool(5)
for i in range(20):
p.apply_async(task,args=(i+1,))
p.close()
p.join()
print('所有子进程结束')
网友评论