python 中的多进程 (工具包:multiprocessing )。
-
我们可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法也有 start(), run(), join() 等方法,其中 Process 进程对象的守护进程是通过设置 daemon 属性来完成的。
-
实现方式一
from multiprocessing import Process
import time
def process1(name):
print('你好 %s %s ' % (name, time.ctime()))
if __name__ == '__main__':
process_list = []
for i in range(5):
p = Process(target=process1, args=('process1',))
p.start()
process_list.append(p)
for i in process_list:
p.join()
执行结果:
image.png
上面我们开启了5个子进程去执行函数,我们可以观察执行时间,是同时打印的。这里实现了真正的并行操作,就是多个 CPU 同时执行任务。进程是python中最小的资源分配单元,也就是进程中间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大了,所以在实际中使用多进程,要根据服务器的配置来设定。
- 实现方式二
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name
def run(self):
print('你好 %s %s' % (self.name, time.ctime()))
if __name__ == '__main__':
process_list = []
for i in range(5):
p = MyProcess('MyProcess')
p.start()
process_list.append(p)
for i in process_list:
p.join()
执行结果:
image.png
效果和第一种方式一样的,Python 多进程的实现方式和多线程的实现方式几乎一样。
Process类的其他方法
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
- group: 线程组
- target: 要执行的方法
- name: 进程名
- args/kwargs: 要传入方法的参数
实例方法: - is_alive():返回进程是否在运行,bool类型。
- join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定timeout(可选参数)。
- start():进程准备就绪,等待CPU调度
- run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
- terminate():不管任务是否完成,立即停止工作进程
属性: - daemon:和线程的setDeamon功能一样
- name:进程名字
- pid:进程号
python 多进程通信
- 进程是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程在使用中与多线程最明显的区别。当然,python 也提供了多种方法实现了多进程中间的通信和数据共享。
进程队列 Queue
- Queue 常在生成者消费者模式中使用,是线程安全的,是生产者和消费者中间的数据管道。在 python 多进程中,它其实就是进程之间的数据管道,实现进程通信。
from multiprocessing import Process,Queue
def fun1(queue,i):
print('子进程 %s 开始 put 数据' %i)
queue.put('子进程 %s 通过 Queue 通信' %i)
if __name__ == '__main__':
queue = Queue()
process_list = []
for i in range(3):
p = Process(target=fun1,args=(queue,i,))
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('主进程获取Queue数据')
print(queue.get())
print(queue.get())
print(queue.get())
print('结束')
执行结果:
image.png
Managers
- Queue 只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。需要用到 Managers。
from multiprocessing import Process, Manager
def fun1(dic,index):
dic[str(index)] = index
print(dic[str(index)])
if __name__ == '__main__':
with Manager() as manager:
# 注意字典的声明方式,不能直接通过{}来定义
dic = manager.dict()
process_list = []
for i in range(5):
p = Process(target=fun1, args=(dic,i))
p.start()
process_list.append(p)
for p in process_list:
p.join()
print(dic.keys(), dic.values())
执行结果:
image.png
主线中定义了一个字典,在子进程中,可以修改字典中的内容,实现进程间的数据共享,既可以共同修改同一份数据。
进程池
- 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
- 进程池中有两个方法:
- apply:同步,一般不使用
- apply_async:异步
from multiprocessing import Process,Pool
import os, time, random
def fun1(name):
print('Task %s (%s) start...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s finish %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
# 创建一个6个进程的进程池
pool = Pool(6)
for i in range(10):
pool.apply_async(func=fun1, args=(i,))
pool.close()
pool.join()
print('结束测试')
执行结果:
Pool 对象调用 join() 方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用 close() 之后就不能继续添加新的Process了。
网友评论