Python-多进程
1 创建一个进程
1.1 用 Process
创建一个进程
from multiprocessing import Process
import time
import os
def foo(name):
print('foo--start', os.getpid())
time.sleep(5)
print('%s--父进程' % name, os.getppid()) # 传入参数
print('foo--end', os.getpid())
if __name__ == '__main__':
print('main--start', os.getpid())
p = Process(target=foo, args=('task-foo',)) # 注册进行,并且传入参数
p.start() # 开启了一个进程
# print('main--父进程号', os.getppid()) # 查看当前的进程的父进程
print('main--end', os.getpid()) # 开启当前进程的进程号
输出结果:
# 子进程的父进程
main--start 19140
main--end 19140
# 子进程(pid相同)
foo--start 19141
task-foo--父进程 19140
foo--end 19141
1.2 进程 Process
介绍
class Process(object):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self.name = ''
self.daemon = False
self.authkey = None
self.exitcode = None
self.ident = 0
self.pid = 0
self.sentinel = None
def run(self):
pass
def start(self):
pass
def terminate(self):
pass
def join(self, timeout=None):
pass
def is_alive(self):
return False
1.3 进程的生命周期
进程的生命周期主要是注意两种情况:
- 主进程需要大量的执行时间
- 子进程执行过程中需要大量的执行时间
主进程(执行时间) >
子进程的(执行时间)
def foo(name):
print('foo--start', os.getpid())
time.sleep(3)
print('%s--父进程' % name, os.getppid()) # 传入参数
print('foo--end', os.getpid())
if __name__ == '__main__':
print('main--start', os.getpid())
p = Process(target=foo, args=('task-foo',)) # 注册进行,并且传入参数
p.start() # 开启了一个进程
time.sleep(5)
print('main--end', os.getpid()) # 开启当前进程的进程号
# print('main--父进程', os.getppid()) # 查看当前的进程的父进程
输出:
main--start 19526 # 主进程执行时间过长
foo--start 19527
task-foo--父进程 19526
foo--end 19527
main--end 19526
子进程(执行时间) >
主进程的(执行时间)
from multiprocessing import Process
import time
import os
def foo(name):
print('子进程foo--start')
print(name)
time.sleep(3)
print('子进程foo--end')
if __name__ == '__main__':
print('主进程开始')
p = Process(target=foo, args=('foo',))
p.start()
print('主进程结束')
输出结果
# ---- 主进程 ----
主进程开始
主进程结束
# ---- 子进程 ----
子进程foo--start
foo
子进程foo--end
1.4 join 方法
- 参数
timeout
默认值None
如果当可选参数 timeout
为默认值 None
的时候,则改方法将堵塞,直到调用其方法的主进程join
终止才终止.当传入参数的时候,超时则停止阻塞,顺序执行.
from multiprocessing import Process
import time
import os
def func(x, y):
print('func--start')
print(x, y)
time.sleep(15)
print('func--end')
if __name__ == '__main__':
print('main-start')
p = Process(target=func, args=(1, 2))
print('开启子进程')
p.start()
print('main--等待子进程返回结果')
p.join() # 阻塞主进程,等待一个子进程的结束而顺序执行,将异步的程序改为同步,
print('子进程结束')
print('main-end')
# 输出结果
main-start
开启子进程
main--等待子进程--foo返回结果
func--start
1 2
func--end
子进程结束
main-end
# 总结
如果在主进程中没有join()的时候,不会等待子进程的结束而继续执行,但是当我们在主进程中使用了join(),则主进程会等待子进程的结束而继续向下执行.
注意:
join(timeout)
如果传入参数timeout
,那么当延时超过则取消主程序的阻塞,继续向下执行.
2 开启多进程
2.1 创建多个进程
import os
import time
from multiprocessing import Process
def foo(x, y):
print('foo-start--%s' % os.getpid())
print(x, y)
time.sleep(3)
print('foo-end--%s' % os.getpid())
if __name__ == '__main__':
print('主进程开始--%s' % os.getpid())
p = Process(target=foo, args=(1, 3)) # 注册第一个进程
p.start() # 启动第一个进程
p1 = Process(target=foo, args=(10, 10)) # 注册第一个进程
p1.start() # 启动第一个进程
print('主进程结束--%s' % os.getpid())
# 输出结果
主进程开始--20307
主进程结束--20307
foo-start--20308 # 第一个进程启动
1 3
foo-start--20309 # 第二个进程启动
10 10
foo-end--20309
foo-end--20308
2.2 多进程使用 join()
的方法
def foo(x, y):
print('foo-start--%s' % os.getpid())
print(x, y)
time.sleep(3)
print('foo-end--%s' % os.getpid())
if __name__ == '__main__':
print('主进程开始--%s' % os.getpid())
p = Process(target=foo, args=(1, 3)) # 注册第一个进程
p.start() # 启动第一个进程
p.join() # 阻塞当前进程,等待子进程 p 执行结束继续向下执行
p1 = Process(target=foo, args=(10, 10)) # 注册第一个进程
p1.start() # 启动第一个进程
p1.join() # 同上
print('主进程结束--%s' % os.getpid())
# 输出结果
主进程开始--20325
foo-start--20326 # 子进程 p 开始
1 3
foo-end--20326 # 子进程 p 结束
foo-start--20327 # 子进程 p1 开始
10 10
foo-end--20327 # 子进程 p1 结束
主进程结束--20325
# 总结
当多进程使用了 join 无疑就是将本来异步的程序变成同步,
但是他们是同时执行在同一时间段上实现了同时运行。
区别于分别调用foo执行速度还是比较快的。
2.3 关于多进程 join 的位置
2.3.1 位于循环体内
def foo(x, y):
print(str(os.getpid()) + '--start')
time.sleep(3)
print(str(os.getpid()) + '--end')
if __name__ == '__main__':
print('主进程开始')
for i in range(5):
p = Process(target=foo, args=(5 * i, 5 * i))
p.start()
p.join() # 等待 第一个子进程 执行回来执行第二个
print('主进程结束')
# 输出结果
20387--start
20387--end
20388--start
20388--end
20389--start
20389--end
20390--start
20390--end
20391--start
20391--end
主进程结束
# 简易的时序图
主进程开始
17750--start
17750--end
17751--start
17751--end
17752--start
17752--end
17753--start
17753--end
17754--start
17754--end
主进程结束
--------------------------------------------------------------> t
# 总结
当我们把join位于循环体内的时候,实际上相当于循环体中调用函数,千万不要这样做。
2.3.2 位于循环体外
def foo(x, y):
# print(str(os.getpid()) + '--start')
print('*'*x)
time.sleep(3)
print('*' * y)
# print(str(os.getpid()) + '--end')
if __name__ == '__main__':
print('主进程开始')
for i in range(5):
p = Process(target=foo, args=(5 * i, 5 * i))
p.start()
p.join() # 此时表示最后一个进程结束后执行 print('主进程结束'),所以主进程结束会有不确定性。
print('主进程结束')
# 输出结果
主进程开始
********************
*****
**********
***************
*****
******************** # 最后的一个进程
主进程结束
**********
***************
# 总结
- 当位于循环体外的时候,实际上是等待最后的一个进程结束而向下执行,简单点说它只是阻塞最后的一个进程。当最后一个进程执行完毕依次顺序执行。当出现这种情况的时候不确定性是非常大的。
2.3.3 循环遍历 join
def foo(x, y):
print(str(os.getpid()) + '--start')
time.sleep(3)
print(x, y)
print(str(os.getpid()) + '--end')
if __name__ == '__main__':
print('主进程开始')
p_list = []
for i in range(5):
p = Process(target=foo, args=(5 * i, 5 * i))
p_list.append(p)
p.start()
[p.join() for p in p_list]
print('主进程依赖所有子进程返回结果才能执行以下任务')
print('主进程结束')
# 输出结果
主进程开始
20416--start
20417--start
20418--start
20419--start
20415--start
15 15
20418--end
10 10
20417--end
5 5
20416--end
20 20
20419--end
0 0
20415--end
主进程依赖所有子进程返回结果才能执行以下任务
主进程结束
# 简易图
主进程开始
20662--start
20659--start
20660--start
20661--start
20663--start
20659--end
20662--end
20660--end
20661--end
20663--end
主进程结束
---------------------------------------------------> t
2.3.4 使用类创建进程
参考源码
class Process(object):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self.name = ''
self.daemon = False
self.authkey = None
self.exitcode = None
self.ident = 0
self.pid = 0
self.sentinel = None
通过继承 Process 类的方式开启多进程
import time
import os
from multiprocessing import Process
class MyProcess(Process):
def run(self):
print(self.pid)
print(self.name)
if __name__ == '__main__':
p = MyProcess()
p.start()
p1 = MyProcess()
p1.start()
那么我们应该如何传入参数那?
class MyProcess(Process):
def __init__(self,arg):
super().__init__()
self.arg = arg # 传入参数
def run(self):
print(self.pid)
print(self.name)
if __name__ == '__main__':
p = MyProcess(1)
p.start()
p1 = MyProcess(2)
p1.start()
# 注意
-当我们传入参数的时候应该继承Process的__init__()
2.3.5 进程之间数据隔离
import os
from multiprocessing import Process
def foo():
print('子进程-foo-开始')
global n
n = 0
print('pid:%s' % os.getpid(), n)
print('子进程-foo-结束')
if __name__ == '__main__':
print('主进程-start')
n = 100
p = Process(target=foo)
p.start()
print(os.getpid(), n)
print('主进程-end')
3 什么是守护进程
概念:随着主进程的结束而结束
3.1 没有开启守护进程
import os
import time
from multiprocessing import Process
def foo():
print('子进程-start')
while True:
time.sleep(0.5)
flag = p.is_alive()
print('子进程-是否正在执行', flag)
print('子进程-end')
if __name__ == '__main__':
print('主程序-开始')
p = Process(target=foo)
p.start()
n = 0
while n < 5:
print('我是socket server %s' % n)
time.sleep(1)
n += 1
print('主程序-结束')
# 输出结果
主程序-开始
我是socket server 0
子进程-start
子进程-是否正在执行 True
我是socket server 1
子进程-是否正在执行 True
子进程-是否正在执行 True
我是socket server 2
子进程-是否正在执行 True
子进程-是否正在执行 True
我是socket server 3
子进程-是否正在执行 True
子进程-是否正在执行 True
我是socket server 4
子进程-是否正在执行 True
子进程-是否正在执行 True
主程序-结束
子进程-是否正在执行 True
子进程-是否正在执行 True
子进程-是否正在执行 True
...... # 由于循环的原因 子进程会继续执行
说明:
- 在上面的例子中主进程结束后,那么子进程不会跟随子进程继续执行。
3.2 开启守护进程 daemon
主进程创建守护进程
- 守护进程会在主进程代码执行结束后就终止
- 守护进程内无法再开启子进程,否则抛出异常:
AssertionError: daemonic processes are not allowed to have children
def foo():
print('子进程-start')
while True:
time.sleep(0.5)
flag = p.is_alive()
print('子进程-是否正在执行', flag)
print('子进程-end')
if __name__ == '__main__':
print('主程序-开始')
p = Process(target=foo)
p.daemon = True # 设置 p 为守护进程
p.start()
n = 0
while n < 5:
print('我是socket server %s' % n)
time.sleep(1)
n += 1
print('主程序-结束')
输出结果
主程序-开始
我是socket server 0
子进程-start
子进程-是否正在执行 True
我是socket server 1
子进程-是否正在执行 True
子进程-是否正在执行 True
我是socket server 2
子进程-是否正在执行 True
子进程-是否正在执行 True
我是socket server 3
子进程-是否正在执行 True
子进程-是否正在执行 True
我是socket server 4
子进程-是否正在执行 True
子进程-是否正在执行 True # 子进程到最后直接跳出循环,随着主进程的结束而结束
主程序-结束
【总结】进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
4 进程锁
4.1 锁 Lock
如果在进程中没有锁会引发的问题
from multiprocessing import Process, Lock
import os
import time
db = {
'ticket': 1
}
def check_ticket():
return print('余票:%s' % db['ticket'])
def get_ticket(i):
if db['ticket'] > 0:
db['ticket'] -= 1
print('\033[32m%s买到票了\033[0m' % i)
else:
print('\033[31m%s没买到票\033[0m' % i)
if __name__ == '__main__':
for i in range(10):
p = Process(target=check_ticket)
p.start()
time.sleep(1)
for i in range(10):
p = Process(target=get_ticket, args=(i,))
p.start()
# 输出结果
余票:1
余票:1
余票:1
余票:1
余票:1
余票:1
余票:1
余票:1
余票:1
余票:1
5买到票了
6买到票了
1买到票了
2买到票了
3买到票了
7买到票了
8买到票了
9买到票了
0买到票了
4买到票了
# 总结
根据打印的输出结果我们可以看出实际上只有一张票,但是却都买到了票,针对这种情况就是对不同的任务抢占同一个资源的场景.
针对上述的例子我们就要用到了锁的概念
剩余票的数量 ticket.txt
{"ticket": 1}
对多个进程修改同一个资源进行安全处理 加锁
def check_ticket(i):
with open('/opt/base_python/library_python/_multilprocess/Lock/ticket.txt') as f:
dic = json.load(f)
print('余票: %s' % dic['ticket'])
# return print(i,'查询余票数量:%s' % db['ticket'])
def get_ticket(i, lock):
lock.acquire() # 给资源加锁
with open('/opt/base_python/library_python/_multilprocess/Lock/ticket.txt') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0:
dic['ticket'] -= 1
print('\033[32m%s买到票了\033[0m' % i)
else:
print('\033[31m%s没买到票\033[0m' % i)
time.sleep(0.1)
with open('/opt/base_python/library_python/_multilprocess/Lock/ticket.txt', 'w') as f:
json.dump(dic, f)
lock.release() # 释放资源锁
总结:
- 注意锁的位置。
4.2 信号量 Semaphore
概念:Lock
属于互斥锁,也就是一把钥匙配备一把锁,同时只允许锁住某一个数据。而信号量则是多把钥匙配备多把锁,也就是说同时允许锁住多个数据。
生活中常见的例子
'''
在该例子中银行共计5个窗口,而实际办理业务的人员共计有20个。所以区别于锁的概念来说信号量就是基于锁的每次只能操作一个资源的扩展变成设置同时操作多个资源。
'''
from multiprocessing import Process
from multiprocessing import Semaphore
import os, random, time
def bank(i, sem): # 银行业务窗口
sem.acquire() # 被叫号 办理业务
print('%s正在办理业务--start' % i)
time.sleep(random.randint(1, 5)) # 每一次办理业务的时候 时间不定
print('%s结束业务--end' % i)
sem.release()
if __name__ == '__main__':
sem = Semaphore(4) # 表示共有4个柜台的办理业务
for i in range(20): # 表示共计有20个业务员进行等待办理业务。
p = Process(target=bank, args=(i, sem))
p.start()
'''
1正在办理业务--start
0正在办理业务--start
2正在办理业务--start
3正在办理业务--start # 可以同时保证开启四个进程 执行任务 与 进程锁区别的是因为 进程锁只有一个 但是他可以执行多个
0结束业务--end
4正在办理业务--start
2结束业务--end
5正在办理业务--start
1结束业务--end
14正在办理业务--start
4结束业务--end
15正在办理业务--start
3结束业务--end
13正在办理业务--start
5结束业务--end
10正在办理业务--start
14结束业务--end
6正在办理业务--start
13结束业务--end
7正在办理业务--start
6结束业务--end
8正在办理业务--start
15结束业务--end
9正在办理业务--start
7结束业务--end
12正在办理业务--start
10结束业务--end
16正在办理业务--start
8结束业务--end
11正在办理业务--start
11结束业务--end
17正在办理业务--start
12结束业务--end
9结束业务--end
18正在办理业务--start
19正在办理业务--start
16结束业务--end
17结束业务--end
18结束业务--end
19结束业务--end
'''
4.3 事件 Event
常用的属性和方法的介绍
from multiprocessing import Event
e = Event() # 创建事件
e.is_set() # 查看是否为阻塞状态 默认阻塞 False
e.set() # 设置 e.is_set() True
# 注意 e.set() 只能设置 e.is_set() 为True
e.clear() # 设置 e.is_set() False
'''
set() 和 clear() 是相对立的
set -- True
clear -- False
# 示例代码
In [3]: e.is_set()
Out[3]: False
In [4]: e.set()
In [5]: e.is_set()
Out[5]: True
In [6]: e.clear()
In [7]: e.is_set()
Out[7]: False
'''
e.wait()
# 根据 e.is_set() 决定是否执行阻塞
# False -- 阻塞
# True 不执行阻塞
-
is_set()--False
&&e.wait()
e = Event()
print('start....')
print(e.is_set())
e.is_set() # False
e.wait() # 执行阻塞
print('end...')
# 输出结果
# start....
# False
-
is_set()--False
&&e.wait()
e = Event()
print('start....')
print(e.is_set())
e.is_set() # False
e.set() # 设置为True
print(e.is_set()) # True
e.wait() # 执行阻塞
print('end...')
# 输出结果
# start....
# False
# end...
【总结】:对于这个地方我总是混淆,所以写的比较详细一些。
(初始)False ----> 阻塞 -- e.set() -- 非阻塞
/ \ /
is_set() wati() --->
\ / \
True ----> 非阻塞 -- e.clear -- 阻塞
注意区分
is_set() 是下达命令
wait 是执行命令
红绿灯事件
# 红绿灯事件
import time
import random
from multiprocessing import Event, Process
def cars(e, i):
if not e.is_set(): # if not false(阻塞状态)
print('car%i在等待' % i)
e.wait() # 阻塞 当 is_set() 为 false 执行阻塞
print('\033[0;32;40mcar%i通过\033[0m' % i)
def light(e):
while True:
if e.is_set():
e.clear() # 将其设置为 False
print('\033[31m红灯亮了\033[0m')
else:
e.set()
print('\033[32m绿灯亮了\033[0m')
time.sleep(2)
if __name__ == '__main__':
e = Event()
traffic = Process(target=light, args=(e,))
traffic.start()
for i in range(20):
car = Process(target=cars, args=(e, i))
car.start()
time.sleep(random.random())
# 输出结果
car0在等待
绿灯亮了
car0通过
car1通过
car2通过
car3通过
car4通过
红灯亮了
car5在等待
car6在等待
car7在等待
绿灯亮了
car5通过
car6通过
car7通过
car8通过
car9通过
car10通过
car11通过
红灯亮了
car12在等待
5 进程之间的通信-队列
5.1 队列的常用方法介绍
In [2]: from multiprocessing import Queue
In [3]: q = Queue(3)
In [4]: dir(q)
...
'cancel_join_thread',
'close', # 关闭后则不能操作
'empty',
'full',
'get', # 注意参数 block timeout
'get_nowait', # 如果队列中空则会抛出异常
'join_thread',
'put', # block 默认为True 如果为False 抛异常
# In [19]: q.full()
# Out[19]: True
# In [20]:
# In [20]: q.put(1,block=False)
'put_nowait', # 如果队列满了则会跑出异常
'qsize' # 查看队列的数量
....
# 压入数据 put 验证是否满了 full
In [9]: q = Queue(3)
In [10]: q.put(1)
In [11]: q.put(2)
In [12]: q.put(3)
In [13]: q.full()
Out[13]: True
In [14]: q.put(4) # 阻塞状态 直到队列中有值被取出
# 取出数据 get 验证是否空了 empty
In [6]: q.put(1)
In [7]: q.get()
Out[7]: 1
In [8]: q.empty()
Out[8]: True
In [9]: q.get() # 阻塞状态 直到有值压入
5.2 进程中使用 Queue
数据传输
5.2.1 主进程-子进程之间数据通信
from multiprocessing import Queue, Process
import time
def _put(q):
q.put('hello word')
if __name__ == '__main__':
q = Queue()
p = Process(target=_put, args=(q,))
p.start()
time.sleep(1)
print(q.get())
# 输出结果
子进程-put
主进程-get
hello word
5.2.1 子进程-子进程之间数据通信
def _put(q):
print('子进程-put')
q.put('hello word')
def _get(q):
print('子进程-get')
print(q.get())
if __name__ == '__main__':
q = Queue()
p_put = Process(target=_put, args=(q,))
p_get = Process(target=_get, args=(q,))
p_put.start()
p_get.start()
# 输出结果
子进程-get
子进程-put
hello word
5.3 队列之生成者消费者
5.3.1 生产者 Queue
def _put(q, worker):
for i in range(1, 11):
time.sleep(1)
ball = '%s生产的第%s个球' % (worker, i)
q.put(ball)
def _get(q):
while True:
ball = q.get()
print('运动员购买了%s' % (ball))
if __name__ == '__main__':
q = Queue(20)
p_put_1 = Process(target=_put, args=(q, 'Bob'))
p_put_2 = Process(target=_put, args=(q, 'Kevin'))
p_get = Process(target=_get, args=(q,))
p_put_1.start()
p_put_2.start()
p_get.start()
# 输出结果
运动员购买了Bob生产的第1个球
运动员购买了Kevin生产的第1个球
运动员购买了Bob生产的第2个球
运动员购买了Kevin生产的第2个球
运动员购买了Bob生产的第3个球
运动员购买了Kevin生产的第3个球
运动员购买了Bob生产的第4个球
运动员购买了Kevin生产的第4个球
运动员购买了Bob生产的第5个球
运动员购买了Kevin生产的第5个球
运动员购买了Bob生产的第6个球
运动员购买了Kevin生产的第6个球
运动员购买了Bob生产的第7个球
运动员购买了Kevin生产的第7个球
运动员购买了Bob生产的第8个球
运动员购买了Kevin生产的第8个球
运动员购买了Kevin生产的第9个球
运动员购买了Bob生产的第9个球
运动员购买了Kevin生产的第10个球
运动员购买了Bob生产的第10个球 ## 由于消费者是while..True..的循环体所以程序运行到此处的时候会产生阻塞 ##
如何避免上述情况
from multiprocessing import Queue, Process
import time, random
def _put(q, worker):
for i in range(1, 11):
time.sleep(1)
ball = '%s生产的第%s个球' % (worker, i)
q.put(ball)
def _get(q, player):
while True:
ball = q.get()
if ball is None:
break
print('运动员%s购买了%s' % (player, ball))
time.sleep(1)
if __name__ == '__main__':
q = Queue(20)
# 生产者
p_put_1 = Process(target=_put, args=(q, 'Bob'))
p_put_2 = Process(target=_put, args=(q, 'Kevin'))
# 消费者
p_get_1 = Process(target=_get, args=(q, 'xx'))
p_get_2 = Process(target=_get, args=(q, 'oo'))
p_put_1.start()
p_put_2.start()
p_get_1.start()
p_get_2.start()
p_put_1.join() # 主进程等待生产者生产完毕
p_put_2.join() # 主进程等待生产者生产完毕
q.put(None) # 当有多个消费者的时候,此时需要压入对应的None
# 原因是在生产者和消费者模型中,当有多个进程的时候只有一个数据的时候此时谁先拿到优先拥有操作权
# 在该例子中我们有两个消费者,如果不压入 两个 None 值则会导致其中一个跳出循环,其余没有拿到None
# 进程依然会阻塞。
q.put(None)
# 为什么不使用
# q.empty() q.qsize() q.get_nowait() 等方法
# 主要是因为压入数据和获取数据中间存在时间差,那么在该时间差内队列中可能是空的。当消费者进行统计的时候并反馈的时候,
# 可能在反馈结果的时候期间又有新的数据压入。导致结果不可靠
5.3.2 生成者消费者中的 JoinableQueue
from multiprocessing import JoinableQueue, Process
import time, random
def _put(q, worker):
for i in range(1, 11):
time.sleep(1)
ball = '%s生产的第%s个球' % (worker, i)
q.put(ball)
q.join() #
def _get(q, player):
while True:
ball = q.get()
print('运动员%s购买了%s' % (player, ball))
time.sleep(1)
q.task_done() #
if __name__ == '__main__':
q = JoinableQueue(20)
# 生产者
p_put_1 = Process(target=_put, args=(q, 'Bob'))
p_put_2 = Process(target=_put, args=(q, 'Kevin'))
# 消费者
p_get_1 = Process(target=_get, args=(q, 'xx'))
p_get_2 = Process(target=_get, args=(q, 'oo'))
p_put_1.start()
p_put_2.start()
# 1 设置消费者 为 守护进程
p_get_1.daemon = True
p_get_2.daemon = True
p_get_1.start()
p_get_2.start()
p_put_1.join() # 主进程等待生产者生产完毕
p_put_2.join() # 主进程等待生产者生产完毕
# 1 第一步设置消费者为守护进程
# 1.1 设置消费者等待
# p_put_1.join() # 主进程等待生产者生产完毕
# p_put_2.join() # 主进程等待生产者生产完毕
# 总结:
# 使用JoinQueue的时候的效果
# 1. 生产者阻塞设置join后,表示生产完数据就立即执行主代码程序
# 2. 但是在队列中设置了 JoinableQueue 的 join 表示 等待消费者彻底消费完才算结束。
# 3. 当生产者经历了 第一轮的 队列 JoinableQueue 的 join 此时消费者已经完成了全部数据的处理
# 4. 当生产者经历了 第二轮的 进程 Process 的 join 完成主进程的后续代码此时消费者是守护进程 随其一起结束
#
# 2 监控消费者是否将队列中的数据全出处理完成 调用task_done()方法
# 3 队列 使用 join 的方法使得生产完数据后等待消费者彻底处理完数据处理
# 总结 JoinableQueue 的两个阻塞
# 1 第一个阻塞 来自 JoinableQueue 使用task_done()查看队列是否完成等待其彻底完成。
# 2 第二个阻塞 来自 Process 使用将 消费者设置守护进程,此时消费者已经彻底的完成了任务,处理了全部队列中的数据, 一同结束该次任务。
【总结】
-
第一个阻塞
JoinableQueue
使用task_done()
判断队列是否完成,如果没有完成等待其彻底完成。 -
第二个阻塞
Process
将消费者设置守护进程(此时消费者已经彻底的完成了任务,处理了全部队列中的数据) 一同结束该次任务。 -
延长生产者的阻塞周期,使消费者彻底完成队列数据处理,达到供需平衡。
6 管道 Pipe
6.1 管道的基本认识
In [1]: from multiprocessing import Pipe
# left --- right #
In [2]: left,right = Pipe()
In [3]: left.send('basketball')
In [4]: right.recv()
Out[4]: 'basketball'
# right --- left #
In [5]: right.send('basketball')
In [6]: left.recv()
Out[6]: 'basketball'
6.2 进程中使用管道
from multiprocessing import Pipe, Process
def foo(children):
children.send('给我转生活费')
if __name__ == '__main__':
children, father = Pipe()
Process(target=foo, args=(children,)).start()
print(father.recv())
6.3 管道引发异常 EOFError
from multiprocessing import Process, Pipe
def foo(left, right):
left.close()
while True:
try:
res = right.recv()
print(res)
except EOFError:
right.close()
break
if __name__ == '__main__':
left, right = Pipe()
Process(target=foo, args=(left, right)).start()
right.close()
for i in range(20):
left.send('basketball-%s' % i)
left.close()
# 总结:
主进程中和子进程中如果对管道关闭互相不受影响。
left(close) 主进程 right(close)
\-----------------------/
**********Pipe*************
/-----------------------\
left(close) 子进程 right(异常EOFError)
6.4 管道进程之资源抢占 Lock
7 进程池
进程池与信号量的区别
假设我们有100个任务
- 信号量开启一百个进程,每次只能对指定数量的任务进行处理,执行完的进程立即销毁,进程等待任务
- 进程池开启指定数量的进程池,任务等待进程去执行,每个进程执行完当前的任务在去执行下一个任务,达到了进程的循环使用,节约了资源.
7.1 进程池的性能比较
from multiprocessing import Pool, Process
import time
def foo(n):
for i in range(100):
print(n + 1)
if __name__ == '__main__':
start = time.time()
pool = Pool(5)
pool.map(foo, range(100))
t = time.time() - start
start = time.time()
p_lst = list()
for i in range(100):
p = Process(target=foo, args=(i,))
p_lst.append(p)
p.start()
for p in p_lst: p.join()
tt = time.time() - start
print(t, tt)
# 输出结果
100
100
100
100
0.11333441734313965 0.30838918685913086
7.2 进程池的同步与异步提交
7.2.1 进程池的同步提交
from multiprocessing import Process, Pool
import time, os
def foo(n):
print('foo-start-%s' % n, os.getpid())
time.sleep(1)
print('foo-end-%s' % n, os.getpid())
if __name__ == '__main__':
p = Pool(5)
for i in range(10):
p.apply(foo, args=(i,))
p.close() # 结束进程池接收任务,才能使用join进行感知任务彻底执行完毕.
p.join() # 等待进程池中的任务彻底执行结束
# 进程池中的进程一直都会处于活跃的状态,只有任务才会被执行完毕.
# 输出结果
foo-start-0 20243
foo-end-0 20243
foo-start-1 20244
foo-end-1 20244
foo-start-2 20245
foo-end-2 20245
foo-start-3 20246
foo-end-3 20246
foo-start-4 20242
foo-end-4 20242
foo-start-5 20243
foo-end-5 20243
foo-start-6 20244
foo-end-6 20244
foo-start-7 20245
foo-end-7 20245
foo-start-8 20246
foo-end-8 20246
foo-start-9 20242
foo-end-9 20242
7.2.2 进程池的异步提交
from multiprocessing import Process, Pool
import time, os
def foo(n):
print('foo-start-%s' % n, os.getpid())
time.sleep(1)
print('foo-end-%s' % n, os.getpid())
if __name__ == '__main__':
p = Pool(5)
for i in range(10):
# p.apply(foo, args=(i,))
p.apply_async(foo, args=(i,))
p.close() # 结束进程池接收任务,才能使用join进行感知任务彻底执行完毕.
p.join() # 等待进程池中的任务彻底执行结束
# 进程池中的进程一直都会处于活跃的状态,只有任务才会被执行完毕.
# 输出结果
foo-start-0 20260
foo-start-1 20257
foo-start-2 20258
foo-start-3 20259
foo-start-4 20256
### 同时开启了5个进程 ###
foo-end-0 20260
foo-start-5 20260
foo-end-1 20257
foo-start-6 20257
foo-end-2 20258
foo-start-7 20258
foo-end-3 20259
foo-start-8 20259
foo-end-4 20256
foo-start-9 20256
foo-end-5 20260
foo-end-6 20257
foo-end-9 20256
foo-end-8 20259
foo-end-7 20258
7.3 进程池的返回值
### 同步提交
from multiprocessing import Pool
import os
def foo(i):
return i * i
if __name__ == '__main__':
print('主进程-start-%s' % os.getpid())
p = Pool(5)
for i in range(10):
res = p.apply(foo, args=(i,)) # apply的结果就是foo的返回值
print(res)
print('主进程-end-%s' % os.getpid())
# 输出结果
主进程-start-34980
0
1
4
9
16
主进程-end-34980
### 异步提交
def foo(i):
return i * i
if __name__ == '__main__':
print('主进程-start-%s' % os.getpid())
p = Pool(5)
for i in range(10):
res = p.apply_async(foo, args=(i,))
print(res.get()) # 阻塞等待输出
print('主进程-end-%s' % os.getpid())
主进程-start-33024
0
1
4
9
16
25
36
49
64
81
主进程-end-33024
# 总结:
- p.apply_async(foo, args=(i,))为异步提交.
- 但是 res.get() 等待结果为同步.换句话说可能都已经执行完毕但是等待结果需要阻塞输出.
# 异步执行
if __name__ == '__main__':
print('主进程-start-%s' % os.getpid())
p = Pool(5)
rlt = []
for i in range(10):
# res = p.apply(foo, args=(i,))
res = p.apply_async(foo, args=(i,))
rlt.append(res)
for res in rlt:
print(res.get())
print('主进程-end-%s' % os.getpid())
# map执行异步
if __name__ == '__main__':
print('主进程-start-%s' % os.getpid())
p = Pool(5)
ret = p.map(foo,range(10))
print(ret)
print('主进程-end-%s' % os.getpid())
# 输出结果
主进程-start-29644
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
主进程-end-29644
7.4 进程池的回调函数
from multiprocessing import Pool
import os
def foo(n):
print('foo-running-%s' % os.getpid())
return n * n
def bar(x):
print('bar-running-%s' % os.getpid())
print(x)
if __name__ == '__main__':
print('主进程-%s' % os.getpid())
p = Pool(5)
# for i in range(10):
p.apply_async(foo, args=(10,), callback=bar)
p.close()
p.join()
# 回调函数
主进程-37752
foo-running-12472
bar-running-37752
100
7.5 进程之间的数据共享
from multiprocessing import Manager, Process, Lock
def bar(d, lock):
lock.acquire()
d['count'] -= 1
lock.release()
if __name__ == '__main__':
m = Manager()
l = Lock()
d = m.dict({'count': 100})
lst = []
for i in range(50):
p = Process(target=bar, args=(d, l))
p.start()
lst.append(p)
for i in lst:
i.join()
print('主进程', d)
进程池开启soket-server端
网友评论