前言
日常工作中,会遇到一些测试数据库抗压能力的需求,这时候就需要使用到python的多进程模块,能够并行的执行任务。我们先来看看并发和并行的区别。
并发(多线程)
并发(Concurrent),在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行。
并行(多进程)
并行(Parallel),当系统有一个以上CPU时,当一个CPU执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。
这里面有一个很重要的点,那就是系统要有多个CPU才会出现并行。在有多个CPU的情况下,才会出现真正意义上的『同时进行』。
一个进程之下,可以启动多个线程,线程间共享内存,可以使用全局变量。
image无通信的进程
下面是进程间无通信代码,会发现一个很难受的问题,无法用参数接收到worker函数的返回值和报错信息。当然如果不需要接收子进程的返回值这样用就可以了。
import random
import multiprocessing
def worker(id):
try:
x = id/random.randint(-1, 3)
return {"result": int(x)}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
jobs = []
for i in range(1,11):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for p in jobs:
p.join()
有通信的进程
进程之间所使用的是不同的内存空间,所以不能使用全局变量。Python的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。
这样就可以通过主进程来管理子进程和获取返回值了,子进程也可以通过Queue
中的数据,来决定自身的运行。
1.主进程和子进程间的通信
可以方便的汇总子进程的返回值和报错
import random
import multiprocessing
def worker(id, q):
try:
print('process {} start'.format(id))
x = id / random.randint(-1, 3)
q.put({id: {"result": int(x)}})
except Exception as e:
q.put({id: {"error": str(e)}})
if __name__ == "__main__":
q = multiprocessing.Queue()
jobs = []
for i in range(1, 11):
p = multiprocessing.Process(target=worker, args=(i, q))
jobs.append(p)
p.start()
for p in jobs:
p.join()
result_list = []
while not q.empty():
result_list.append(q.get())
print(result_list)
2.子进程间的通信
实现多进程查找乱序数组中某个数的位置,并在某个子进程查找到时,结束其它子进程
import random
import multiprocessing
def worker(id, temp_list, select_number, q):
print('process {} start'.format(id), len(temp_list))
for index, number in enumerate(temp_list):
if q.empty():
if number == select_number:
q.put({'id': id, 'index': index})
else:
break
if __name__ == "__main__":
select_number = 999
list_length = 10000
q = multiprocessing.Queue()
shuffle_number_list = [i for i in range(list_length)]
random.shuffle(shuffle_number_list)
start = 0
jobs = []
for i in range(1, 11):
temp_list = shuffle_number_list[start:i * 1000]
start += 1000
p = multiprocessing.Process(target=worker, args=(i, temp_list, select_number, q))
jobs.append(p)
p.start()
for p in jobs:
p.join()
result = q.get()
print(shuffle_number_list[result['index'] + (result['id'] - 1) * 1000] == select_number)
网友评论