Python由于GIL的存在,多线程(Thread)、协程(Asyncio)可以实现并发,并行则依赖多进程(Multiprocessing)实现。
多进程的学习可以参考廖雪峰Python教程和Python标准库。
-
https://www.liaoxuefeng.com/wiki/1016959663602400/1017628290184064
-
https://docs.python.org/zh-cn/3.7/library/multiprocessing.html
本文就Multiprocessing的日常使用做个demo。
一、通过Process创建多进程
实现多进程,可以创建多个Process对象,并调用start()去生成进程,通过join()等待完成。
from multiprocessing import Process
import os
import time
import random
def run(name: str):
print(f"Child process {os.getpid()} do {name} task, parent process {os.getppid()}")
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f'Task {name} runs {end - start} seconds.')
def main():
# test process
start = time.time()
print(f'Parent process {os.getpid()}')
p1 = Process(target=run, args=('test1',))
p2 = Process(target=run, args=('test2',))
p1.start()
p2.start()
p1.join()
p2.join()
print('Child process end')
end = time.time()
print(f'All Tasks runs {end - start} seconds.')
if __name__ == '__main__':
main()
效果如图,parent进程62488,创建的子进程分别为62489和62450,共用时0.38s,小于分别执行的0.3s和0.36s之和,说明并行。
1619576989417.jpg
二、通过Pool进程池创建
一般情况下,不会通过Process这种低级API去创建多进程,可以通过Pool进程池去创建。Pool(num),num指定工作进程数目,默认通过os.cpu_count()获取CPU核数,通过apply_async注册函数。
from multiprocessing import Pool
import subprocess
import os
import time
import random
def pool_run(name: str):
print(f'Child process {os.getpid()} Run task {name}')
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f'Task {name} runs {end - start} seconds.')
return name
def main():
# test pool
print(f'Parent process {os.getpid()}')
p = Pool(4)
result = []
for i in range(5):
result.append(p.apply_async(pool_run, args=(i,)))
p.close()
p.join()
for res in result:
print(res.get())
print('All subprocesses done.')
if __name__ == '__main__':
main()
效果如图,总共5个任务,4个进程。parent进程62577,同时创建4个子进程去处理任务,在完成其中1个任务后,该空余的进程继续执行第5个任务。
1619577359400.jpg三、进程间通信
Multiprocessing提供两种标准的进程通信方式Quere、Pipe。在比如生产者-消费者这种模型可能会用到。
from multiprocessing import Process, Queue
import os
import time
import random
def write(q):
print(f'Process to write: {os.getpid()}')
for value in ['A', 'B', 'C']:
print(f'Put {value} to queue...')
q.put(value)
time.sleep(random.random())
def read(q):
print(f'Process to read: {os.getpid()}')
while True:
value = q.get(True)
print(f'Get {value} from queue.')
def main():
# test queue
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
if __name__ == '__main__':
main()
效果如图,进程62773生产,进程62774消费,两者并行,通过队列queue做通信。
1619578230343.jpg四、总结
可能一般情况下,用Pool即可。
网友评论