涉及的模块主要包括:
-
multiprocessing模块
- Process类
- Queue类
- Pool类
-
subprocess模块
- call方法
- Popen类
multiprocessing中的Process类
使用Process类,可以启动一个子进程并执行指定的函数代码.
示例代码
import os
from multiprocessing import process
def test_func(func_name): #定义一个函数,作为子进程的代码
print(f"subprocess' name is {func_name} PID is {os.getpid()}")
if __name__ == "__main__":
print(f'Parent process PID is {os.getpid()}')
p = Process(target=test_func, args=('TestFunc',)) #定义一个Process的类对象p, 子进程的函数引用和参数元组作为类的初始参数
print('Child process will start...')
p.start() #启动子进程
p.join() #等待子进程结束
print('Child process end.')
执行结果
[LiangZhang@MacBook test]$
[LiangZhang@MacBook test]$python3 test.py
Parent process PID is 6153
Child process will start...
subprocess' name is TestFunc PID is 6154
Child process end.
multiprocessing中的Pool类
如果需要批量创建子进程,并且在子进程中执行更多的函数,那么我们可以使用Pool类.
import os
from multiprocessing import Pool
import time
import random
def test_func(func_name): #定义一个函数,作为子进程的代码
print(f"subprocess' name is {func_name} PID is {os.getpid()} and ParentPID is {os.getppid()}")
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f"Task {func_name} has been done in {end-start} seconds.")
if __name__ == "__main__":
print(f'Parent process PID is {os.getpid()}')
p = Pool(4) #通Pool创建一个类对象p, 该对象包含4个子进程.
for i in range(15):
p.apply_async(func=test_func, args=(i,)) #将多个任务(15个函数)分配进程池对象p中的子进程.
print('Waiting for all subprocess done...')
p.close() #阻止任何其它任务提交到进程池, 一但之前提交的任务完成,所有子进程退出.
p.join() #等待进程池中的子进程全部完成后再执行后续代码. (在调用join之前必须调用close或terminate)
print('All the subprocesses done...')
注: 我们在提交任务给进程池时使用的方法是"p.appy_async()", 该方法和p.appy()的区别是: p.apply_async()为异步执行,所有子进程并行运行. 而p.apply()是同步执行, 后一个子进程需要等待前一个子进程执行完毕都会运行.
程序输出**
[LiangZhang@MacBook test]$python3 test.py
Parent process PID is 6426
Waiting for all subprocess done...
subprocess' name is 0 PID is 6427 and ParentPID is 6426
subprocess' name is 1 PID is 6428 and ParentPID is 6426
subprocess' name is 2 PID is 6429 and ParentPID is 6426
subprocess' name is 3 PID is 6430 and ParentPID is 6426
Task 3 has been done in 0.7806532382965088 seconds.
subprocess' name is 4 PID is 6430 and ParentPID is 6426
Task 2 has been done in 0.9312319755554199 seconds.
subprocess' name is 5 PID is 6429 and ParentPID is 6426
.....
....
Task 10 has been done in 2.6583831310272217 seconds.
Task 14 has been done in 1.1612498760223389 seconds.
Task 12 has been done in 2.321526050567627 seconds.
All the subprocesses done...
通过输出,我们注意到无论有多少任务,都是由进程池中4个进程完成的.
multiprocessing中的Queue类
我们可以通过Queue类在父进程中创建了个队列,用于子进程之间通信.
以下示例将在主进程中创建一个queue, 和两个子进程, 一个子进程向queue中写入数据,另一个子进程从queue中读取数据.
import os
from multiprocessing import Process,Queue
import time
import random
def writter(q,data_list): #定义一个函数用于向Queue写入数据
print(f"Subprocess Writer's PID is {os.getpid()}")
for data in data_list:
print(f"Put <{data}> to Queue...")
q.put(data) #向队列q中写入数据
time.sleep(random.random())
def reader(q):
print(f"Subprocess Reader's PID is {os.getpid()}")
while True:
data = q.get(True) #从队列中获取数据, 参数True为block模式.
print(f'Get <{data}> from Queue!')
if __name__ == "__main__":
test_datalist = ['Alice', 'Bob', 'Cindy', 'David']
print(f'Parent process PID is {os.getpid()}')
q = Queue()
pw = Process(target=writter, args=(q,test_datalist))
pr = Process(target=reader, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
注: q.get(True) 参数True为block=True, 意思是如果队列不为空,立刻取出数据,但是如果队列为空则阻塞等待. 如果block=False 如果队列为空把raise一个queue.Empty的异常.
输出结果
[LiangZhang@MacBook test]$python3 test.py
Parent process PID is 6661
Subprocess Writer's PID is 6662
Put <Alice> to Queue...
Subprocess Reader's PID is 6663
Get <Alice> from Queue!
Put <Bob> to Queue...
Get <Bob> from Queue!
Put <Cindy> to Queue...
Get <Cindy> from Queue!
Put <David> to Queue...
Get <David> from Queue!
subprocess中的call方法执行外部程序做为子进程
import os
import subprocess
r = subprocess.call(['nslookup', 'www.baidu.com']) #将执行的命令和参数放入一个list传递给call方法, r为执行完程序后的返回代码.
print(f"exitCode:", r)
subprocess中的Popen类实现和子程序输入/输出的交互
使用Popen启用一个外部程序作为子进程, 我们可以通过给stdin/stdout/stderr赋值,以便于后期随时查看子进程的输出,以及输入相应的内容给子程序.
import subprocess
p = subprocess.Popen(['nslookup',],stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) #创建一个Popen类对象, 执行程序'nslookup', 并且设置输入/输出/错误输出为subprocess.PIPE.
try:
output, err = p.communicate(b'www.baidu.com',5) #使用communication方法为'nslookup'传递后续输入. 参数5为等待时间, 如果超时后子程序还会退出, 那么就会raise一个异常, 执行期间输出数据不会丢失. 注意: communication方法接受的输入必须是字节码,不可以是字符串.
except: #捕获执行超时的异常
p.kill() #如果执行超时,那么终止子程序
output, err = p.communicate(None) #并且把执行期间的输出以元组形式返回.
print(output.decode('utf-8')) #由于返回的输出是字节码形式,所以先decode以后才可以打印
输出
[LiangZhang@MacBook test]$python3 test.py
Server: 192.168.50.1
Address: 192.168.50.1#53
Non-authoritative answer:
www.baidu.com canonical name = www.a.shifen.com.
Name: www.a.shifen.com
Address: 61.135.169.125
Name: www.a.shifen.com
Address: 61.135.169.121
网友评论