一、多进程概述
- 程序:是一个指令的集合
- 进程:正在执行的程序;或者说:当你运行一个程序,你就启动了一个进程
- 编写完的代码,没有运行时,称为程序,正在运行的代码,称为进程;
- 程序是死的(静态的),进程是活的(动态的)
- 操作系统轮流让各个任务交替执行,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样;
- 多进程中,每个进程中所有数据(包括全局变量)都各自拥有一份,互相不影响。
- 程序启动运行时,首先会创建一个主进程
- 在主进程(父进程)下,我们可以创建新的进程(子进程),子进程依赖于主进程,如果主进程结束,程序会退出
- 创建子进程时会在子进和空间中复制一份主进程的代码,全局变量在多个子进程之间不共享,进程之间的数据是独立的,默认情况下互不影响。
二、通过调用multiprocessing.Process实现多进程
- Python提供了非常好用的多进程包multiprocessing,借助这个包,可以轻松完成从单进程到并发执行的转换。
- multiprocessing模块提供了一个Process类来创建一个进程对象。
- 如果用循环创建多个进程时,可以把多个子进程对象加到list中,在循环外再循环执行list中的每个对象的join()方法以达到主进程等待所有子进程都结束。
import timefrom multiprocessing import Process
def sing(name):
for i in range(10):
print(name+"唱歌")
time.sleep(1)
def dance():
for i in range(10):
print("跳舞")
time.sleep(1)
if __name__ == '__main__':
p1 = Process(target=sing, args=('林志凌',), name="sing")
p2 = Process(target=dance , name="dance")
p1.start()
print(p1.name)
p2.start()
print(p2.name)
p1.join() #join() 主进程等待子进程结束
p2.join()
- Process 类常用方法
- p.start():启动进程,并调用该子进程中的p.run()
- p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
- p.terminate() 强制终止进程p,不会进行任何清理操作
- p.is_alive() 如果p仍然运行,返回True,用于判断进程是否在运行
- p.join([timeout]) 主进程等待子进程结束,timeout是可选的超时时间。
- Process类常用属性
- name:当前进程实例别名,默认为Process-N,N从1开始递增
- pid:当前进程实例的PID值
三、通过继承Process类创建多进程
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(arg):
super().__init__()
self.arg = arg
def run(self):
n = self.arg
while n>0:
print(n)
time.sleep(1)
n -= 1
if __name__ == '__main__':
p = ClockProcess(5)
p.start()
p.join()
四、进程池-Pool
- 进程池:用来创建多个进程
- 当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果要创建的子进程数很多时,此时可以用到multiprocessing模块提供的Pool
- 初始化Pool时,可以指定一个最大进程数, 当有新的请求提交到Pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到最大值,那么该请求就会等待,直到进程池中有进程结束,才会创建新的进程来执行。
from multiprocessing import Pool
import random, time
def work(num):
print(random.random() * num)
time.sleep(3)
if __name__ == '__main__':
po = Pool(4) #如果不写默认为当前CPU核数
for i in range(10):
po.apply_async(work, (i,))
po.close()
po.join()
- multiprocessing.Pool常用函数解析:
- apply_async(func[, args[, kwgs]]):使用非阻塞方式调用fun(并行执行,阻塞方式必须等上一个进程退出才能执行下一个进程)args为传递给func的参数列表,kwgs为传递给func的关键字参数列表;
- apply(func[, args[, kwgs]]) (了解即可几乎不用)使用阻塞的方式调用func
- close():关闭Pool,使其不再接受新的任务;
- terminate():不管任务是否完成,立即终止;
- join():主进程阻塞,等待子进程退出,必须在close()或terminate之后使用;
五、 进程间通信-Queue
- 可以使用multiprocessing模块的Queue实现多进程间的数据传递
- 初始化Queue()对象时(例如:q = Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限
- Queue.qsize():返回当前队列包含的消息数量
- Queue.empty():如果队列为空,返回True,反之False
- Queue.full():如果队列满了,返回True,反之False
- Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从队列中移除,block默认值为True
- 如果block使用默认值,且没有设置timeout(单位移),消息队列如果为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出“Queue.Empty”异常
- 如果block值为False,消息队列如果为空,则会立刻抛出“Queue.Empty”异常。
- Queue.get_nowait() :相当于Queue.get(False)
- Queue.put(item, [block, [, timeout]]) :将item消息写入队列,block默认值为True
- 如果block使用默认值,且没有设置timeout,消息队列如果已没有空间写入,此时程序将被阻塞(停在写入状态),直到从消息队列中腾出空间为止,如果设置了True和timeout,则会等待timeout秒,若还没有空间,则抛出”Queue.Full“异常
- 如果block值为False,消息队列如果没有空间可写入,则会立刻抛出”Queue.Full“异常
- Queue.put_nowait(itme) :相当于Queue.put(item, False)
import time
from multiprocessing import Queue, Process
def write(q)
for value in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
print("开始写入:", value)
q.put(value)
time.sleep(1)
def read(q):
while True:
if not q.empty():
print("读取到的是", q.get())
time.sleep(1)
else:
break
if __name__ == '__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
time.sleep(1)
pr.start()
pw.join()
pr.join()
print('接收完毕!')
六、进程池之间通信- Manager().Queue()
- 进程池之间通信,就需要使用multiprocessing.Manager()中的Queue()而不是multiprocessing.Queue()
- 否则会得到一条如下错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
import time
from multiprocessing import Manager, Pool
def writer(q):
for i in "welcome":
print("开始写入", i)
q.put(i)
def reader(q):
time.sleep(3)
for i in range(q.qsize()):
print("得到消息", q.get())
if __name__ == "__main__":
print("主进程启动")
q = Manager().Queue()
po = Pool()
po.apply_async(writer, (q,))
po.apply_async(reader, (q,))
po.close()
po.join()
网友评论