进程跟线程调用的基本是同一套 api,使用方法跟线程几乎是相同的
import multiprocessing
import time
def a(title):
print(title,time.ctime())
time.sleep(2)
print(title, 'end')
if __name__ == '__main__':
process = multiprocessing.Process(target=a,args=('python',))
process.start()
process.join()
print('ending -----------------',time.ctime())
输出结果:
python Thu Apr 26 16:43:57 2018
python end
ending ----------------- Thu Apr 26 16:43:59 2018
自定义的类继承 multiprocessing.Process,
import multiprocessing
import time
class MyProcess(multiprocessing.Process):
def __init__(self,title):
multiprocessing.Process.__init__(self)
self.title = title
def run(self):
print('begin to run')
time.sleep(2)
print('hello', self.title, self.name, time.ctime())
if __name__ == '__main__':
process = MyProcess('world')
process.start()
process.join()
print('ending -----------------',time.ctime())
输出结果:
begin to run
hello world MyProcess-1 Thu Apr 26 16:53:50 2018
ending ----------------- Thu Apr 26 16:53:50 2018
关于进程的子进程和父进程
import multiprocessing
import os
import time
def info(title):
print("title:", title)
print('parent process:', os.getppid()) # 获得这个进程的父进程的 pid
print('process id:', os.getpid()) # 获得这个进程的 pid
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main process line')
time.sleep(1)
print("------------------")
p = multiprocessing.Process(target=info, args=('hello world',))
p.start()
p.join()
输出结果:
title: main process line
parent process: 6164 # 这个进程的父进程也就是 pycharm了(我用pycharm运行程序的)
process id: 6004
------------------
title: hello world
parent process: 6004 # 子进程的父进程也就是 6004 了
process id: 12220 # 子进程
进程间通信,用进程队列,Queue, 主要的是数据同步,数据用复制的形式传到另一个进程,
import multiprocessing
import time
def aa(q):
print('begin')
time.sleep(2)
q.put(123)
q.put('hello')
print('son end, ',time.ctime())
if __name__ == '__main__':
q = multiprocessing.Queue() # 进程队列
process = multiprocessing.Process(target=aa,args=(q,))
process.start()
print(q.get())
print(q.get())
print('main end')
输出结果:
begin
son end, Thu Apr 26 20:07:17 2018
123
hello
main end
管道通信
import multiprocessing
import time
def aa(conn):
print('son id = ',id(conn))
time.sleep(1)
conn.send('hello')
print(conn.recv())
if __name__ == '__main__':
p_conn, c_conn = multiprocessing.Pipe() # 双向管道
process = multiprocessing.Process(target=aa,args=(c_conn,))
process.start()
print('main id = ',id(p_conn))
print(p_conn.recv())
p_conn.send('libai')
print('main end')
输出结果:
main id = 1768407138600
son id = 1942925877768
hello
main end
libai
Queue 和 Pipe 只是实现了数据交互, 并没实现数据共享,所谓的数据共享,即一个进程去更改另一个进程的数据。
import multiprocessing
def aa(l, i):
l.append(i)
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
l = manager.list('hello')
process_list = []
for i in range(5):
process = multiprocessing.Process(target=aa, args=(l,i))
process.start()
process_list.append(process)
for i in process_list:
i.join()
print(l)
输出结果:
['h', 'e', 'l', 'l', 'o', 0, 1, 2, 3, 4]
# 实现数据交互
进程同步,类似于线程内锁的概念
import multiprocessing
import time
def aa(lock, l,i):
lock.acquire()
time.sleep(1)
print('hello')
l.append(i)
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
l = multiprocessing.Manager().list()
process_list = []
for i in range(10):
process = multiprocessing.Process(target=aa,args=(lock,l,i))
process.start()
process_list.append(process)
for p in process_list:
p.join()
print(l)
输出结果:
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
[0, 2, 1, 3, 4, 5, 6, 7, 8, 9]
进程池, Pool
import multiprocessing
import time
def aa(i):
time.sleep(1)
print('hello',i)
if __name__ == '__main__':
pool = multiprocessing.Pool(5) # 不填参数就会以 CPU 核数作为 max
for i in range(10):
# pool.apply(func=aa, args=(i,)) # 同步接口,就是串行执行
pool.apply_async(func=aa, args=(i,)) # 异步接口,并发或并行
pool.close()
pool.join() # close 和 join 顺序是固定的
输出结果:
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
回调函数,callback() ,子进程中有时候并不能并行的执行某些操作,例如操作文件打印日志和修改数据,可以交给 callback() ,让主进程串行执行
import multiprocessing
import time
import os
def aa(i):
time.sleep(1)
print('hello',i, os.getpid())
def Bar(arg):
print('bar',os.getpid())
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
print('main ',os.getpid())
for i in range(10):
# 回调函数: 就是某个动作或者函数执行成功后再去执行的函数
# 回调函数是在主进程中调用的,而不是子进程
pool.apply_async(func=aa, args=(i,),callback=Bar)
pool.close()
pool.join()
输出结果:
main 11048
hello 0 1228
bar 11048
hello 1 9348
bar 11048
hello 2 10280
bar 11048
hello 3 8768
bar 11048
hello 4 8544
bar 11048
hello 5 1228
bar 11048
hello 6 9348
bar 11048
hello 7 10280
bar 11048
hello 8 8768
bar 11048
hello 9 8544
bar 11048
# 可以看出,回调函数的 bar 是在主进程中运行的
回调函数,获取返回值
import multiprocessing
import time
def aa(i):
time.sleep(1)
return i # 返回的值被 callback 函数 Bar 接收到,
def Bar(arg):
print('bar get i = ',arg)
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
for i in range(10):
pool.apply_async(func=aa, args=(i,),callback=Bar)
pool.close()
pool.join()
输出结果:
bar get i = 0
bar get i = 1
bar get i = 2
bar get i = 3
bar get i = 4
bar get i = 5
bar get i = 6
bar get i = 7
bar get i = 8
bar get i = 9
生成器,yield,每 next 一次,遇到 yield 就返回
def test():
print('one')
yield 1
print('two')
yield
t = test() # 返回一个生成器对象
print(t)
res1 = t.__next__() # 执行到yield 1 停止
print(res1)
print('------------------------------------------------------')
res2 = next(t) #从上次执行到的 yield 1 后面继续执行, 但是并没有设定返回值
print(res2)
输出结果:
<generator object test at 0x00000202ECBDD6D0>
one
1
------------------------------------------------------
two
None
生成器,yield 传值
def test():
print('one')
s = yield 1
print(s)
print('two')
yield
t = test()
res1 = t.__next__()
print(res1)
print('------------------------------------------------------')
t.send(150) # send 相当于一个 next
输出结果:
one
1
------------------------------------------------------
150
two
协程,协作式,非抢占式的程序,关键在 yield,主要是什么时候切换,解决的也是 IO 操作
协程本质上也就是一个线程
优势:
- 没有切换的消耗
- 没有锁的概念
- 要使用多核,可以和多进程配合
import time
def getSend(name):
while True:
s = yield
time.sleep(1)
print('[%s] get the send is %s, time %s'%(name, s, time.ctime()))
if __name__ == '__main__':
s1 = getSend('one')
s2 = getSend('two')
s1.__next__()
s2.__next__()
n = 0
while n < 4:
time.sleep(3)
s1.send(n)
s2.send(n+1)
n = n + 2
print('ending')
输出结果:
[one] get the send is 0, time Mon Apr 30 19:29:29 2018
[two] get the send is 1, time Mon Apr 30 19:29:30 2018
[one] get the send is 2, time Mon Apr 30 19:29:34 2018
[two] get the send is 3, time Mon Apr 30 19:29:35 2018
ending
greenlet 模块, 简单但不是很实用
import greenlet
def test1():
print('12')
gr2.switch()
print('56')
gr2.switch()
def test2():
print('34')
gr1.switch()
print('78')
gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
gr1.switch()
输出结果:
12
34
56
78
gevent 模块, 遇到 IO 就切换
import time
import requests
import gevent
start = time.time()
url_list = ['https://www.baidu.com/',
'http://www.sina.com.cn/',
'https://www.jianshu.com/'
]
def req(url):
requests.get(url)
print('get url',url)
# for i in url_list:
# req(i)
gevent.joinall([
gevent.spawn(req, url_list[0]),
gevent.spawn(req, url_list[1]),
gevent.spawn(req, url_list[2]),
])
print('cost time : ', time.time()-start)
输出结果:
get url https://www.baidu.com/
get url http://www.sina.com.cn/
get url https://www.jianshu.com/
cost time : 0.6288654804229736
# 和串行执行的结果消耗时间相差不多。
更具体的可以看:https://www.cnblogs.com/yuanchenqi/articles/6248025.html
网友评论