什么是进程?
进程是程序运行时,代码+代码运行时用到的资源(如网络带宽、声卡、显卡等),它是操作系统分配资源的基本单位。
多进程会对代码进行写时拷贝,是一种比较耗费资源的多任务形式。它比较适用于计算密集型的程序。
进程实现多任务的例子
用多进程实现多任务和多线程的实现方式基本相同,唯一区别在于将threading
模块替换为multiprocessing
模块。如下所示:
import multiprocessing
from time import sleep
def test1(nums):
for i in range(nums):
print("%d in test1"%i)
sleep(0.5)
def test2(nums):
for i in range(nums):
print("%d in test2"%i)
sleep(0.5)
def main():
p1 = multiprocessing.Process(target=test1, args=(10,))
p2 = multiprocessing.Process(target=test2, args=(10,))
p1.start()
p2.start()
if __name__ == "__main__":
main()
进程与线程的区别联系
进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,它是系统进行资源分配的单位;线程是进程的一个实体,是CPU调度和分派的基本单位,线程自己基本上不占用系统资源,只拥有一点在运行中必不可少的资源(如程序计数器、一组寄存器和栈)。
进程和线程的关系:
- 一个程序至少有一个进程,一个进程里至少有一个线程
- 线程的划分尺度小于进程,使得多线程程序的并发性高
- 进程在执行过程中拥有独立的内存单元,而一个进程中的多个线程共享内存
- 线程不能独立执行,而是必须依赖进程
此外,在python中,线程只能分布在一个计算机的不同CPU上,而进程是可以通过分布在不同设备上运行,通过网络进行数据通信的。
进程间的通信
在python中,可以通过multiprocessing
中的队列(Queue)和管道(Pipe)来完成进程间的数据交换。
使用Queue实现进程间的通信
Queue提供了多个进程之间实现通信的方法。
简单的理解 Queue 实现进程间通信的方式,就是使用了操作系统给开辟的一个队列空间,各个进程可以把数据放到该队列中,当然也可以从队列中把自己需要的信息取走。
下表列出了Queue类的一些常用方法
方法名 | 功能 |
---|---|
put(obj[,block=True[,timeout=None]]) | 将 obj 放入队列,其中当 block 参数设为 True 时,一旦队列被写满,则代码就会被阻塞,直到有进程取走数据并腾出空间供 obj 使用。timeout 参数用来设置阻塞的时间,即程序最多在阻塞 timeout 秒之后,如果还是没有空闲空间,则程序会抛出 queue.Full 异常 |
put_nowait(obj) | 该方法的功能等同于 put(obj, False) |
get([block=True[, timeout=None]]) | 从队列中取数据并返回,当 block 为 True 且 timeout 为 None 时,该方法会阻塞当前进程,直到队列中有可用的数据。如果 block 设为 False,则进程会直接做取数据的操作,如果取数据失败,则抛出 queue.Empty 异常(这种情形下 timeout 参数将不起作用)。如果手动 timeout 秒数,则当前进程最多被阻塞 timeout 秒,如果到时依旧没有可用的数据取出,则会抛出 queue.Empty 异常 |
get_nowait() | 该方法的功能等同于 get(False) |
empty() | 判断当前队列空间是否为空,如果为空,则该方法返回 True;反之,返回 False |
一个用Queue实现进程间通信的例子如下:
import multiprocessing
import random
def data_downloader(q):
"""模拟数据下载"""
# 创建数据,模拟下载的数据
print("----start data downloading----")
mat = list()
num_batch = 10
batch_size = 5
for i in range(num_batch):
tmp = list()
for j in range(batch_size):
tmp.append(round(random.random(), 2))
mat.append(tmp)
# 将数据推入队列
for row in mat:
q.put_nowait(row)
if q.full():
break
print("----finish data downloading----")
def data_processor(q):
"""模拟数据处理,打印每一行的数字和平均值"""
print("----start data processing----")
while not q.empty():
tmp = q.get_nowait()
print("Data in the row: ", tmp, " average: ", sum(tmp)/len(tmp))
print("----finish data processing----")
def main():
"""用Queue类,实现多进程间的通信"""
q = multiprocessing.Queue(5) # 最多可以容纳5条信息
# 创建进程
p1 = multiprocessing.Process(target=data_downloader, args=(q,))
p2 = multiprocessing.Process(target=data_processor, args=(q,))
# 启动线程
p1.start()
p2.start()
if __name__ == '__main__':
main()
这里要注意,虽然看起来似乎什么容器都可以用这样的操作传入进程中,用来进行数据交换,但是实际上在Python里,其他原生容器,如list, dict等都是不能从进程中得到数据的。
使用Pipe实现进程间的通信
Pipe常用于实现两个进程间的通信,这两个进程分别位于管道的两端,一端用来发送数据,另一端用来接收数据。创建管道的语法格式如下:
conn_1, conn_2 = multiprocessing.Pipe(duplex=True)
当设置参数duplex为True时,管道可以双向通信。
Pipe
中的方法如下:
方法名 | 功能 |
---|---|
send(obj) | 发送一个 obj 给管道的另一端,另一端使用 recv() 方法接收。需要说明的是,该 obj 必须是可序列化的,如果该对象序列化之后超过 32MB,则很可能会引发 ValueError 异常 |
recv() | 接收另一端通过 send() 方法发送过来的数据 |
close() | 关闭连接 |
poll([timeout]) | 返回连接中是否还有数据可以读取 |
send_bytes(buffer[,offset[,size]]) | 发送字节数据。如果没有指定 offset、size 参数,则默认发送 buffer 字节串的全部数据;如果指定了 offset 和 size 参数,则只发送 buffer 字节串中从 offset 开始、长度为 size 的字节数据。通过该方法发送的数据,应该使用 recv_bytes() 或 recv_bytes_into 方法接收 |
recv_bytes([maxlength]) | 接收通过 send_bytes() 方法发送的数据,maxlength 指定最多接收的字节数。该方法返回接收到的字节数据 |
recv_bytes_into(buffer[,offset]) | 功能与 recv_bytes() 方法类似,只是该方法将接收到的数据放在 buffer 中 |
上面那个例子用Pipe可以改写为:
import multiprocessing
import random
def data_downloader(conn):
"""模拟数据下载"""
# 创建数据,模拟下载的数据
print("----start data downloading----")
mat = list()
num_batch = 10
batch_size = 5
for i in range(num_batch):
tmp = list()
for j in range(batch_size):
tmp.append(round(random.random(), 2))
mat.append(tmp)
# 将数据推入管道
conn.send(mat)
print("----finish data downloading----")
def data_processor(conn):
"""模拟数据处理,打印每一行的数字和平均值"""
print("----start data processing----")
for tmp in conn.recv():
print("Data in the row: ", tmp, " average: ", sum(tmp)/len(tmp))
print("----finish data processing----")
def main():
"""用Pipe类,实现多进程间的通信"""
conn_in, conn_out = multiprocessing.Pipe()
# 创建进程
p1 = multiprocessing.Process(target=data_downloader, args=(conn_in,))
p2 = multiprocessing.Process(target=data_processor, args=(conn_out,))
# 启动线程
p1.start()
p2.start()
if __name__ == '__main__':
main()
进程池
为什么需要进程池?
- 减少手动创建大量进程的工作量
需要创建的子进程不多时,可以用multiprocessing
中的Process
动态生成多个进程,但如果需要成千上百个子进程时,手动创建工作量过大,此时可以用到multiprocessing
模块中的Pool
方法。当任务数不能确定时,Pool
也是很好的解决方案。
- 降低创建和销毁进程的开销
此外,进程的创建和销毁是需要占用时间和资源的,因此当任务数很多时,为了减少创建销毁进程占用的时间和资源,我们可以用一个进程池保持一定数量的进程,复用这些进程去轮流执行任务。这样也可以提高程序效率。
进程池示例
当初始化Pool
时,可以定下一个最大进程数,当有新的请求提交到Pool
时,如果有空闲的进程,那么就创建一个新的进程来执行该请求;但是如果池中的进程数已达到指定的最大值,那么请求就会等待,直到池中有进程执行完任务,进入空闲状态,那么新的可用进程就被分派来执行请求。
注意通过进程池创建的进程,主进程不会等待其执行完毕再结束,因此需要用poolObj.join()
来显示等待进程池中进程的结束。
下面是一个用进程池执行多任务的例子:
from multiprocessing import Pool
import random, time, os
def worker(msg):
"""模拟一个任务"""
t_start = time.time()
print("%s 开始执行,进程号为%d"%(msg, os.getpid()))
time.sleep(random.random()*5)
t_stop = time.time()
print(msg, " 执行完毕,耗时%0.3f秒"%(t_stop - t_start))
def main():
po = Pool(3)
# 创建10个任务
for i in range(10):
# Pool().apply_async(func, args=(,)) 运行func指定的任务,并且是非阻塞的,不会阻塞主进程运行
po.apply_async(worker,args=(i,))
print("----start----")
po.close() # 结束进程池,此后不会再有任务进入进程池
po.join() # 等待进程池中的进程执行完毕,必须在close()之后使用,否则会报错
print("----end----")
if __name__ == "__main__":
main()
有一点需要格外注意的时,当进程池中的进程执行任务遇到错误时,并不会返回异常值。如下例所示:
import multiprocessing
def worker(cnt):
for i in range(cnt):
print("Now in worker")
raise IOError # 在进程内手动报错
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
for i in range(100):
pool.apply_async(worker, args=(10,))
pool.close()
尽管我们手动raise IOError
,但是上面这个程序运行时是不会产生输出的。
Pool模块的常用函数
下面更深入探讨下Pool
类中的常用方法:
进程池的创建
Pool(processes, initializer, initargs, maxtasksperchild)
参数:
- processes: 进程池中工作进程个数,默认为
None
,代表机器的cpu个数cpu_count()
- initializer: 工作进程初始化时调用的函数,如果不为空,那么每个进程在初始化的时候会调用一次
initializer(*initargs)
- maxtasksperchild: 每个工作进程在结束之前需要完成的工作任务数,如果设置了一个数量,那么工作进程完成相应次数的工作任务后就会结束。默认为
None
,代表工作进程存活的时间与Pool
相同,即不会自动结束。
利用这个函数,可以创建pool
类,下面就可以用它的方法对任务进行操作了。
任务提交
apply(func[,args[,kwds]])
单任务同步提交,将 func 函数提交给进程池处理。其中 args 代表传给 func 的位置参数,kwds 代表传给 func 的关键字参数。该方法会被阻塞直到 func 函数执行完成。
apply_async(func[,args[,kwds[,callback[,error_callback]]]])
这是apply
方法的异步版本,该方法不会被阻塞。新增的两个参数callback
代表func
函数执行完成后的回调函数,error_callback
指定func
函数出错之后的回调函数。
map(func, iterable[, chunksize])
多任务同步提交,用新进程对iterable中的每个元素执行func
函数。
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map
方法的异步版本,callback
和error_callback
的含义与上面相同。
starmap(func, iterable[,chunksize] )
功能类似于 map() 方法,但该方法要求 iterable 的元素也是 iterable 对象,程序会将每一个元素解包之后作为 func 函数的参数。
等待和结束任务
close()
关闭进程池。在调用该方法之后,该进程池不能再接收新任务,它会把当前进程池中的所有任务执行完成后再关闭自己。在实际使用中可以用with
关键字,用上下文管理器自动关闭进程池。
terminate()
立即中止进程池
join()
等待所有进程完成
网友评论