概念
程序是静态的,程序运行起来后的代码+用到的资源(内存、摄像头、网络、硬盘等资源)叫进程,是操作系统分配资源的基本单元
进程的状态
实现多任务
import multiprocessing
import time
def test1():
while True:
print("----1----")
time.sleep(1)
def test2():
while True:
print("----2----")
time.sleep(1)
def main():
p1 = multiprocessing.Process(target=test1)
p2 = multiprocessing.Process(target=test2)
p1.start()
p2.start()
#print(p1.pid) #打印子进程的pid
#print(p2.pid)
if __name__ == '__main__':
main()
>>>----1----
>>>----2----
>>>----1----
>>>----2----
...
也可以继承multiprocessing.Process
:
import multiprocessing
import time
class Process1(multiprocessing.Process):
def run(self):
while True:
print("----1----")
time.sleep(1)
class Process2(multiprocessing.Process):
def run(self):
while True:
print("----2----")
time.sleep(1)
def main():
p1 = Process1()
p2 = Process2()
p1.start()
p2.start()
if __name__ == '__main__':
main()
运行后有三个进程,主进程,子进程1,子进程2。
正常情况,子进程不会修改代码,只会修改内存中的数据,所以只要把资源(内存、摄像头、网络、硬盘等资源)拷贝过来即可,和主进程共享一份代码,节省开销:
如果子进程通过特殊方法修改了代码,这时子进程复制主进程的代码+资源:
和线程的区别
进程是代码+资源的总称,一个进程中的多任务就叫线程。
多个线程使用的是同一份资源,多个进程有各自的资源。
拿着进程的资源去运行的是线程。上图中的红箭头其实都是线程在运行
进程是资源分配的单位,线程是操作系统调度的单位
不同的场景下应用
1、对于io操作,线程优于进程,因为线程切换开销比进程小。GIL遇到io操作会主动释放。
2、对于消耗cpu的操作,进程优于线程,进程可以充分发挥多核的优势,能并行执行。线程由于GIL的限制,同一时刻只有一个线程在一个cpu上执行,线程是并发执行。
进程池概念
创建固定数量的进程(有任务才创建,会有个最大值),进程池循环利用里面的进程去执行多任务,避免了频繁创建进程销毁进程的开销
进程池使用
方式一
from multiprocessing import Pool
import os,time
def run1(msg):
print("%s开始执行,进程号是%d"%(msg,os.getpid()))
time.sleep(1)
if __name__ == "__main__":
po = Pool(2)#最多两个进程,现在不会创建,使用到的时候创建
for i in range(0, 6):#添加了6个任务,虽然超过了进程池大小,不会报错,会先存起来。空闲的子进程会从任务队列中取任务执行。
#添加任务到任务队列
result = po.apply_async(run1, (i,)) #apply_async()返回的是ApplyResult对象,可以通过get()获取返回值(该方法会阻塞主进程一直等待子进程的返回值),还有其他方法可以查看ApplyResult类
print('start')
po.close()#关闭进程池,关闭后po不会再接收新的请求
po.join()#等待po中所有子进程执行完成,必须放在close语句后面
print('end')
>>>start
>>>0开始执行,进程号是7404
>>>1开始执行,进程号是8040
>>>2开始执行,进程号是7404
>>>3开始执行,进程号是8040
>>>4开始执行,进程号是7404
>>>5开始执行,进程号是8040
>>>end
通过进程池创建的任务主进程不会等待子进程执行完成,如果没有调用join()
,主进程执行完会直接结束,子进程的任务也跟着结束了。
通过multiprocessing.Process
创建的子进程,主进程会等待子进程执行完成再退出。
方式二
from concurrent.futures import ProcessPoolExecutor,as_completed
import os,time
def run1(msg):
print("%s开始执行,进程号是%d"%(msg,os.getpid()))
time.sleep(1)
if __name__ == "__main__":
with ProcessPoolExecutor(2) as executor:
all_task = [executor.submit(run1,i) for i in range(0,6)]
for future in as_completed(all_task):
pass
print('end')
>>>0开始执行,进程号是6844
>>>1开始执行,进程号是804
>>>2开始执行,进程号是6844
>>>3开始执行,进程号是804
>>>4开始执行,进程号是6844
>>>5开始执行,进程号是804
>>>end
进程间通信
1、使用队列:
from multiprocessing import Queue
q = Queue(3) #初始化Queue对象,最多可接收三条put消息
q.put(1)
q.put('abc')
print(q.full())
q.put(['a','b','c'])
print(q.full())
try:
q.put_nowait('test') #q.put('test'),如果队列已满,会阻塞,不会像put_nowait()一样抛异常
except:
print('消息队列已满,现有消息数量:%s'%q.qsize())
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait()) #q.get()如果队列为空,会阻塞,不会像get_nowait()一样抛异常
多进程下不支持使用from queue import Queue
,会抛异常:TypeError: can't pickle _thread.lock objects
multiprocessing .Queue
作为中间介质:
import multiprocessing
def download_data(q):
data = [1,2,3,4]
#向队列中写入数据
for temp in data:
q.put(temp)
def deal_data(q):
#从队列中获取数据
resList = list()
while True:
data = q.get()
resList.append(data)
if q.empty():
break
print(resList)
def main():
#1、创建一个队列
q = multiprocessing.Queue()
#2、创建多个进程,将队列的引用传递过去
p1 = multiprocessing.Process(target=download_data,args=(q,))
p2 = multiprocessing.Process(target=deal_data,args=(q,))
p1.start()
p2.start()
if __name__ == "__main__":
main()
>>>[1,2,3,4]
如果在主进程中创建列表,通过共享变量的方式传递给子进程可以实现数据共享吗?
def addData1(tmp_list):
tmp_list.append(1)
print('进程1',tmp_list)
def addData2(tmp_list):
tmp_list.append(2)
print('进程2',tmp_list)
def main():
tmp_list = []
p1 = multiprocessing.Process(target=addData1,args=(tmp_list,))
p2 = multiprocessing.Process(target=addData2,args=(tmp_list,))
p1.start()
p2.start()
time.sleep(2)
print('主进程',tmp_list)
if __name__ == "__main__":
main()
>>>进程1 [1]
>>>进程2 [2]
>>>主进程 []
前面说过一般情况下,多进程共用一份代码,子进程拷贝主进程资源,各自独立
子进程中tmp_list = []
,相当于各自拿同一份代码在执行,互不影响
multiprocessing .Queue在进程池中失效
下面的代码在进程池中使用了多进程的队列,没有输出结果
import multiprocessing
def download_data(q):
data = [1,2,3,4]
#向队列中写入数据
for temp in data:
q.put(temp)
def deal_data(q):
#从队列中获取数据
resList = list()
while True:
data = q.get()
resList.append(data)
if q.empty():
break
print(resList)
def main():
q = multiprocessing.Queue()
po = multiprocessing.Pool(2)
po.apply_async(download_data, (q,))
po.apply_async(deal_data, (q,))
po.close()
po.join()
if __name__ == "__main__":
main()
q = multiprocessing.Queue()
修改成q = multiprocessing.Manager().Queue()
后可以正常输出
2、使用管道
pipe适用于只有两个进程通信的场景,性能高于queue
from multiprocessing import Process,Pipe
def download_data(pipe):
pipe.send([1,2,3])
def deal_data(q):
print(q.recv())
def main():
recv_pipe,send_pipe = Pipe()
p1 = Process(target=download_data,args=(send_pipe,))
p2 = Process(target=deal_data,args=(recv_pipe,))
p1.start()
p2.start()
if __name__ == "__main__":
main()
>>>[1,2,3]
3、Manager()提供的共享变量
将上面的tmp_list修改成multiprocessing.Manager().list()
,可以定义一个进程间共享的list()变量。Manager()还提供了其他的数据类型。
import multiprocessing
import time
def addData1(tmp_list):
tmp_list.append(1)
print('进程1',tmp_list)
def addData2(tmp_list):
tmp_list.append(2)
print('进程2',tmp_list)
def main():
tmp_list = multiprocessing.Manager().list()
p1 = multiprocessing.Process(target=addData1,args=(tmp_list,))
p2 = multiprocessing.Process(target=addData2,args=(tmp_list,))
p1.start()
p2.start()
time.sleep(2)
print('主进程',tmp_list)
if __name__ == "__main__":
main()
>>>进程2 [2]
>>>进程1 [2, 1]
>>>主进程 [2, 1]
网友评论