使用的包:multiprocessing
使用其中的模块:Process、Pipe、Queue、Value、Array
说明:虽然multiprocessing理论上是跨平台均可使用,但是win和linux在语法上还是有些差别!本人原始在ubuntu下运行进程间通信都很顺畅,但同样的文件在win下就通不过!原因其实很简单:win下写法要更加严格。
Process模块使用区别
在linux下的进程创建、激活以及父进程处理僵尸子进程的方式:
from multiprocessing import Process
import time
def fun(num):
print('ceshi:', num)
pp = []
# 子进程的创建与激活
for x in range(10):
p = Process(target = fun, args = (x,))
pp.append(p)
p.start()
# 以下全是父进程语句:
# 父进程处理子进程
for x in pp:
x.join()
print('父进程处理完毕')
这是最简单的写法,但是在win下运行会报这样的错误:
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
按照错误提示,我们只需要把主函数的语句,放到主函数里即可;因此可得win下的程序:
from multiprocessing import Process
import time
def fun(num):
print('ceshi:', num)
if __name__ == '__main__':
pp = []
# 子进程的创建与激活
for x in range(10):
p = Process(target = fun, args = (x,))
pp.append(p)
p.start()
# 以下全是父进程语句:
# 父进程处理子进程
for x in pp:
x.join()
print('父进程处理完毕')
Pipe模块使用区别
Pipe也是multiprocessing包下的一个模块,
- 作用:在内存中开辟一块管道空间来帮助多进程间进行信息传递。
- 使用:和Process模块不太一样,因为"管道"空间需要你在程序中人为声明一下,即多一条声明语句。
linux下程序:
from multiprocessing import Process,Pipe
import os,time
# 管道声明语句:必须放在最前面!!!
double1_conn, double2_conn = Pipe()
def fun1(name):
time.sleep(1)
double1_conn.send("ceshi" + str(name)) # 子进程往管道空间发消息
print('父进程PID号:', os.getppid(), ";", '子进程PID号:', os.getpid())
# 子进程创建激活语句:
jobs = []
for i in range(5):
p = Process(target = fun1, args = (i,))
jobs.append(p)
p.start()
# 下面全是父进程语句:
# 父进程对子进程的处理
for i in jobs:
i.join()
# 父进程从管道空间取数据:
print('父进程从管道空间中取消息:')
for i in range(5):
data = double2_conn.recv()
print(data)
同样,这样的语句在win下运行还是会报和上面同样的错误!
还是提示我们要把主函数的语句下近一个主函数里:
from multiprocessing import Process,Pipe
import os,time
# 注意:传入的参数中要有一个"管道变量"!
# 因为:原本最开始的管道声明语句,现在是在fun1函数的下面!
def fun1(P, name):
time.sleep(1)
P.send("ceshi" + str(name))
print('父进程PID号:', os.getppid(), ";", '子进程PID号:', os.getpid())
if __name__ == '__main__':
double1_conn, double2_conn = Pipe() # 管道声明语句必须放进来!
jobs = []
for i in range(5):
p = Process(target = fun1, args = (double1_conn,i))
jobs.append(p)
p.start()
for i in jobs:
i.join()
for i in range(5):
data = double2_conn.recv()
print(data)
注意两点:
- 管道声明必须也放进下面的主函数语句里!不能再放到文件的最前面!
- 函数传参时,必须传进去一个"管道变量(如fun1中第一个形参P)"才能在函数里发送或接受管道的信息!因为原始的管道声明语句已经放到了函数声明语句的下面。
Queue模块使用区别
另外一种进程间通信方法,使用和Pipe很像,也需要在程序中受到声明一下,在内存中开辟队列空间。
linux下程序:
from multiprocessing import Queue, Process
import time
que = Queue(10) # 队列的声明要放最前面
# 子进程的功能函数
def fun(name):
time.sleep(1)
que.put('ceshi' + str(name))
print('存入一条消息', '队列中现有 %s 条消息' % que.qsize())
# 进程的创建:
jobs = []
for x in range(10):
p = Process(target = fun, args=(x,))
jobs.append(p)
p.start()
# 处理子进程
for x in jobs:
x.join()
# 下面是父进程:
print('当前队列中有:', que.qsize(), '条消息')
while not que.empty():
print('拿到的消息:', que.get(), ';', '还有 %d ' % que.qsize())
不改写的话,非常有可能会报和上面一样的错误;此外,就算有些win下可以通过,它会一直显示你没有成功得把消息传进队列当中!队列中一直只有1条消息。
因此,win下程序改写:
from multiprocessing import Queue, Process
import time
# 函数传参:必须要把绑定队列的变量传进来!
# 因为:同样声明语句已经放到了函数的下面,必须要传进来
def fun(que, name):
time.sleep(1)
que.put('ceshi' + str(name))
print('存入一条消息', '队列中现有 %s 条消息' % que.qsize())
if __name__ == '__main__':
que = Queue(10) # 队列声明必须要放进来!
# 进程的创建:
jobs = []
for x in range(10):
p = Process(target = fun, args=(que, x))
jobs.append(p)
p.start()
# 处理子进程
for x in jobs:
x.join()
# 下面是父进程:
print('当前队列中有:', que.qsize(), '条消息')
while not que.empty():
print('拿到的消息:', que.get(), ';', '还有 %d ' % que.qsize())
同样注意两点:
- 队列的声明也必须放进主函数语句里,且放在其中的最前面;
- 函数传参时,必须把"绑定队列的那个变量"传进去,才能在函数里发送或提取队列的信息!因为原始的队列声明语句已经放到了函数声明语句的下面。
Value模块使用区别
- 作用:创建贡献内存,大家共同修改一个参数
- 使用:和管道差不多
linux下程序:
from multiprocessing import Value, Process
import time
import random
money = Value('i', 5000) # 开辟共享内存,初始值给2000
# 子进程1:存钱
def deposite(money):
for i in range(100):
time.sleep(0.03)
money.value += random.randint(1,150)
# 子进程2:取钱
def withdraw(money):
for i in range(100):
money.value -= random.randint(1,150)
# 创建子进程:
d = Process(target = deposite, args = (money,))
w = Process(target = withdraw, args = (money,))
# 激活子进程:
d.start()
w.start()
# 下面都是父进程的语句:
# 父进程处理子进程:
d.join()
w.join()
# 父进程查看最后共享内存里的那个数值:
print(money.value)
win下程序:
from multiprocessing import Value, Process
import time
import random
# 注意:传入的参数是绑定那个共享内存空间的变量!!!
# 子进程1:存钱
def deposite(money):
for i in range(100):
time.sleep(0.03)
money.value += random.randint(1,150)
# 子进程2:取钱
def withdraw(money):
for i in range(100):
money.value -= random.randint(1,150)
if __name__ == '__main__':
money = Value('i', 5000) # 共享内存声明语句,必须写到里面
# 创建子进程:
d = Process(target = deposite, args = (money,))
w = Process(target = withdraw, args = (money,))
# 激活子进程:
d.start()
w.start()
# 父进程处理子进程:
d.join()
w.join()
# 父进程查看最后共享内存里的那个数值:
print(money.value)
注意两点:
- 共享内存声明语句,必须写到主函数里,且在其中的最上面;
- 函数传参:传入的是绑定那个共享内存空间的变量!这个绑定的变量有自己的属性,可以获取或修改对应的共享内存中的数值。
Array模块使用区别
- 作用:共享内存创建的另一种形式,相比与value模块往共享内存中放入的是一个数值,array往共享内存中存放的是一个"元素类型必须相同"的列表。
linux下程序:
from multiprocessing import Process, Array
import time
# 声明创建一个共享内存,其中初始化一个列表
shm = Array('i', [1,2,3])
def fun(shm):
for i in range(len(shm)):
shm[i] = 99
# 创建子进程:
p = Process(target=fun, args=(shm,))
# 激活子进程:
p.start()
# 父进程处理子进程:
p.join()
# 父进程获取新的共享内存中数列表:
for i in shm:
print(i)
win下的函数:
from multiprocessing import Process, Array
import time
# 修改共享内存中数组里的值
def fun(shm):
for i in range(len(shm)):
shm[i] = 99
if __name__ == '__main__':
shm = Array('i', [1, 2, 3]) # 声明创建一个共享内存,要放到里面
# 创建子进程:
p = Process(target = fun, args = (shm,))
# 激活子进程:
p.start()
# 父进程处理子进程:
p.join()
# 父进程获取新的共享内存中数列表:
for i in shm:
print(i)
注意:使用注意事项和value一样。
说明
上面所有的linux程序"不能"在win下正常运行;
但所有的win程序"可以"在linux下正常运行!!!
很明显:win下的程序写法更加正式、标准、通用!所以以后写进程间通信的程序,直接按照win要求的这种写法去写即可,只有好处没有坏处。
网友评论