基于官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥
我就是喜欢抄官方的,哈哈
-
Class Process
通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。
这种方法和 Thread 是一样的。
- 一个最贱单的例子:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('傻逼',))
p.start()
p.join()
print("运行结束")
>> hello 傻逼
运行结束
上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")
否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:
p = Process(target=f, args=('傻逼',))
p.start()
# time.sleep(1)
print("运行结束")
>> 运行结束
hello 傻逼
主进加个 sleep
结果:
hello 傻逼
运行结束 (大约等了1秒后,才输出 运行结束)
所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了
- 查看子进程和父进程的 id
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid()) # 获取父进程的 pid (process id)
print('process id:', os.getpid())
def f(name):
info('function f') # use info
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
>> main line
module name: __main__
parent process: 14212
process id: 1940
function f
module name: __mp_main__
parent process: 1940
process id: 15020
hello bob
上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id
下面就讲一下,这两个函数的用法:
os.getpid()
返回当前进程的id
os.getppid()
返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个
windows返回相同的id (可能被其他进程使用了)
这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。
而子进程的父级 process id 是调用他的那个进程的 id : 1940
-
上下文和启动子进程
这个内容太多需要,蛮蛮总结。wc。
视频笔记:
多进程:使用大致方法:
-
多进程通信
- queue 管道:队列是线程和进程安全的(不会竞争)
q = mp.Queue()
把q 传给args, 然后在里面 q.put(你的东西)
取值在外面直接 q.get() # 先进先出
普通的
Process 函数用 return,我们也拿不到任何返回值 (智能通过 queue 或者共享内存来交换数据)
pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表
def b(x):
# return 20
print("child process")
if __name__ == "__main__":
p =Pool(processes=2)
print(p.map(b, [1,2,3,4]))
>>
child process
child process
child process
child process
[None, None, None, None]
poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)
- 共享内存使用:
multiprocessing.Value("type", value) # 单个值 value 智能是数字类型的,具体看 type 的设置 比如 i是 有符号整形
multiprocessing.Array("type", [single level list])
- Pipe 管道
Pipe()
函数返回一个由管道连接的连接对象
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
返回的两个连接对象
Pipe()
表示管道的两端。每个连接对象都有send()
和recv()
方法
- 资源竞争 枷锁
lock = mp.Lock()
把 lock 传给进程的args 参数
在进程函数里面写 lock.acquire() 枷锁
lock.release() 释放锁
练习过程中出现的问题:
from multiprocessing import Pool
pool = Pool()
def search(a):
# while a<1000000:
# a+=1
print("子进程结束")
return 1
if __name__ == "__main__":
# pool = Pool()
a = time.time()
result = pool.map(search, [1,2,10])
b = time.time()
print(b-a)
报错:
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
参考 :https://blog.csdn.net/xiemanR/article/details/71700531
把 pool = Pool() 放到 if name == "main": 下面初始化搞定。
结果:
子进程结束
子进程结束
子进程结束
0.7355780601501465
这个肯定有解释的
spawn.py 的解释
测试多进程计算效果:
进程池运行:
class aa():
def search(self, a):
while a<100000000:
a+=1
print("子进程结束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
result = pool.map(c.search, [1,2,10])
b = time.time()
print(b-a)
结果:
子进程结束
子进程结束
子进程结束
16.176724195480347
红框是程序运行的状态
普通计算:
class aa():
def search(self, a):
while a<100000000:
a+=1
print("子进程结束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
# result = pool.map(c.search, [1,2,10])
c.search(1)
c.search(2)
c.search(10)
b = time.time()
print(b-a)
我们同样传入 1 2 10 三个参数测试:
子进程结束
子进程结束
子进程结束
24.153281211853027
红框为执行计算的过程,看出来很平稳
其实对比下来开始快了一半的;
我们把循环里的数字去掉一个 0;
单进程:
子进程结束
子进程结束
子进程结束
3.1451985836029053
多进程:
子进程结束
子进程结束
子进程结束
2.4246084690093994
两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。
问题 二:
视图:
post 视图里面
class xxviews():
def post(request):
music = Music()
result = music.run()
return ...
Music 类:
pool = Pool()
class Music():
def __init__(self):
pass
def run(self):
result = self.search(...)
pass
def search(self, keyword, target_src):
pool.map(....)
直接报错:
好像是说 子进程自己不可以在创建子进程了
写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。
最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:
class Music():
def __init__(self):
pass
def run(self):
result = self.search(...)
pass
def search(self, keyword, target_src):
pool = Pool()
pool.map(....)
pass
成功运行了
前台也能显示搜索的音乐结果了
总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。
还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??
使用 pool.map 子进程 函数报错,导致整个 pool 挂了:
参考:https://blog.csdn.net/hedongho/article/details/79139606
主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。
关于map 传多个函数参数
我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:
class aa():
def search(self, a,b):
while a<10000000:
a+=1
print("子进程结束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
result = pool.map(c.search, ((1,3),))
报错:
return self._map_async(func, iterable, mapstar, chunksize).get()
File "E:\Program\python\lib\multiprocessing\pool.py", line 657, in get
raise self._value
TypeError: search() missing 1 required positional argument: 'b'
参考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 当让可以穿多个参数,map 却不知道咋传的。
apply_async 和map 一样,不知道咋传的。
最简单的方法:
使用 starmap 而不是 map
class aa():
def search(self, a,b):
while a<10000000:
a+=1
print("子进程结束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
result = pool.starmap(c.search, ((1,3),))
b = time.time()
print(b-a)
结果:
子进程结束
1.8399453163146973
成功拿到结果了
关于map 和 starmap 不同的地方看源码:
def mapstar(args):
return list(map(*args))
def starmapstar(args):
return list(itertools.starmap(args[0], args[1]))
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()
def starmap(self, func, iterable, chunksize=None):
'''
Like `map()` method but the elements of the `iterable` are
expected to
be iterables as well and will be unpacked as arguments. Hence
`func` and (a, b) becomes func(a, b).
'''
return self._map_async(func, iterable, starmapstar, chunksize).get()
关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈
关于 上面源码里面有 itertools.starmap
itertools 用法参考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions
有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:
Pool(processes=os.cpu_count()-1)
上面就是预留了 一个cpu 干其他事的
后面直接使用 Queue 遇到这个问题:
RuntimeError: Queue objects should only be shared between processes through inheritance
解决:
Manager().Queue() 代替 Queue()
from multiprocessing import Manager, Queue
queue = Manager().Queue()
# queue = Queue()
因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:
使用 queue.empty() 空为True
current_search = "稍等" if music_current_search.empty() else
music_current_search.get()
网友评论