一、进程相关概念
狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
广义定义:进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是操作系统分配资源的基本单位。它是操作系统动态执行的基本单元,在传统的操作系统中,进程是基本的分配单元。
程序和进程的区别:
进程是程序在计算机上的一次执行活动。当你运行一个程序, 你就启动了一个进程。
程序只是一组指令的有序集合, 它本身没有任何运行的含义, 只是一个静态实体。
而进程, 它是程序在某个数据集上的执行, 是一个动态实体。它因创建而产生, 因调度而运行, 因等待资源或事件而被处于等待状态, 因完成任务而被撤消, 反映了一个程序在一定的数据集上运行的全部动态过程。
多任务:
多任务是指用户可以在同一时间内运行多个应用程序, 每个应用程序被称作一个任务, 一个任务就是一个进程(Process).Linux、windows就是支持多任务的操作系统, 比起单任务系统它的功能增强了许多。比如:你一边聊QQ,一边聊微信,还听着音乐,这就是多任务,有几个任务同时运行。
单核CPU实现多任务的原理:
操作系统轮流让多个任务交替执行 比如QQ执行了2us ,在切换到微信 执行了2us,在切换到其他,表面看起来像是在同时执行,是因为cpu的调度太快。
多核cpu实现多任务的原理:
真正的并行执行多任务只能在多核CPU上实现,但是由于当下任务数量远远多于CPU的核心数量,所以操作系统也会自动把很多任务轮流调度到每个核心上执行。
并发和并行的区别:
并发和并行都可以处理“多任务”, 二者的主要区别在于是否是“同时进行”多个的任务。
并发是指一个处理器同时处理多个任务, 在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果,但在微观上并不是同时执行的,只是把时间分成若干段,使多个进程快速交替的执行。
并行是多个处理器同时处理多个不同的任务。
并发是逻辑上的同时发生, 而并行是物理上的同时发生。
并行在多处理器系统中存在, 而并发可以在单处理器和多处理器系统中都存在。
二、 Python中的多进程
multiprocessing模块
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如一个函数)。multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。Python是通过创建Process对象来生成进程,然后调用它的start()方法启动。
Process类属性和方法
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
参数:
group 值为None, 进程组
target 子进程执行的目标函数,会在run()方法中调用
name 进程名
daemon 是否为守护进程(True/False),默认为None继承父进程
方法:
start() 启动进程,同时调用子进程中的run()方法
run() 子进程启动时调用的方法,在这个方法中执行target目标函数,可以在子类中重写此方法
terminate() 强制终止进程。
is_alive() 返回进程是否处于活动状态。返回boolean
join([timeout]) 进程等待。如果可选参数timeout是None(默认值),则该方法将阻塞,直到被调用join()方法的子进程终止。如果timeout是一个正数,它最多会阻塞timeout秒。
简单的创建子进程实例:
from multiprocessing import Process
def foo(a):
print(a)
if __name__ == "__main__":
p1 = Process(target=foo,args=('test',)) # args必须传入元组
p1.start()
p1.join() # 等待子进程执行完毕在结束主进程
注意:
(1)在windows中Process()必须放到 if __name__ == '__main__':下,目的是防止模块被导入 造成多个进程执行的混乱。
(2)添加进程名.join()
,是因为默认是主进程创建子进程之后继续往下执行 执行完毕主进程结束, 通过调用这个方法主进程等待子进程执行结束之后,主进程在继续向下直到结束。
可以通过两种方式创建子进程:
(1) 通过编写函数
from multiprocessing import Process
import os
def foo(name):
print("进程名为:", name, "进程id为", os.getpid())
if __name__ == '__main__':
p1 = Process(target=foo,args=('zhangsan',)) # args传入的是元素必须加,号
p2 = Process(target=foo,args=('lisi',))
p3 = Process(target=foo,args=('wangwu',))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print('主线程结束了!')
(2) 通过继承Process类
from multiprocessing import Process
import os
class MyClass(Process):
def __init__(self, name):
Process.__init__(self) # 必须初始化父类
self.name= name
def run(self):
print(self.name) # 调用类的属性
print("执行了这里")
if __name__ == '__main__':
p1 = MyClass('zhangsan')
p2 = MyClass('lisi')
p3 = MyClass('wangwu')
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print('主线程结束了!')
进程池
当使用multiprocessing中的Process动态创建多个进程,如果是几个或十几个进程还好,但是如果是上百个进程,手动去管理进程数量非常困难,此时可以使用进程池来管理进程。
from multiprocessing import Pool
import time
def foo():
print(name)
time.sleep(2)
if __name__ == '__main__':
pool = Pool(processes=5) # 进程并发数,创建5个进程,如果不添加参数,默认并发数为核心数
for i in range(5):
name = "name- %d" % i
pool.apply_async(foo, args=(name,)) # 异步非阻塞
# pool.apply(foo, args=(name,)) # 阻塞
print("for 循环中执行~~~~~")
pool.close() # 关闭进程池,不在向进程池加入子进程
pool.join() # 主进程等待子进程 执行完毕在继续向下执行
print("主进程执行到这里了")
注意:
(1)pool.apply(foo, args=(name,)) 阻塞式执行子进程,其每次只能向进程池添加一个进程,然后for循环会被堵塞等待,执行完一个进程,下一个进程才会被执行。
(2)pool.apply_async(foo, args=(name,)) 异步非阻塞执行子进程,是不用等待当前进程执行完毕,可以根据系统调度来进行进程切换。
(3)调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool, join函数等待所有子进程结束
进程池pool.map()方法
如果有大量的数据添加到进程池里 如果使用map会非常的方便,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。
第一个参数传入方法 第二个方法传入迭代器
def foo(x):
time.sleep(2)
print(x)
pool = Pool(2)
myList = ['zhangsan', 'lisi']
pool.map(foo, myList) # 并发执行
pool.close()
pool.join()
print("主进程执行结束了")
获取子进程的返回值
from multiprocessing import Pool
import time
import os
def demo(x):
# print(x)
time.sleep(2)
return x # 将传入进来的参数进行返回
if __name__ == '__main__':
print(os.cpu_count()) # 返回cpu的核心数
pool = Pool()
for i in range(10):
res = pool.apply_async(demo, args=(i,))
print(res) # 返回结果对象
print(res.get()) # 获取返回值
pool.close() # 不在向进程池放进程
pool.join() # 进程等待
三、进程间通信
多进程全局变量是否共享呢?
from multiprocessing import Process
name = 'zhangsan'
def demo():
global name
name = 'lisi'
print("name==", name, "id(name)==", id(name)) # ('name==', 'lisi', 'id(name)==', 139807614774032)
if __name__ == '__main__':
p = Process(target=demo)
p.start()
p.join()
print("name==", name, "id(name)==", id(name)) # ('name==', 'zhangsan', 'id(name)==', 139807614773936)
print("主进程执行结束了")
显然,进程间全局变量不共享,子进程可以获取到主进程的全局变量,但是不能修改,如果修改了全局变量,只是修改了拷贝一份的同名的变量,内存地址是不同的。即进程间内存空间不是共享的。
进程间通信
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块提供两种方法:管道和队列。
管道Pipe
class multiprocessing.Pipe(duplex=True)
返回包含两个Connection对象的元组,默认管道是全双工的,左右两个都可以发送接受。如果创建管道的时候duplex设置为False,左边只能用于接收,右边只能用于发送。
from multiprocessing import Process
from multiprocessing import Pipe
def demo(p1):
print(p1.recv()) # a
p1.send([1,2,3,4])
print(p1.recv()) # ['a','b']
if __name__ == '__main__':
p1, p2 = Pipe()
print(Pipe()) # (<read-write Connection, handle 8>, <read-write Connection, handle 9>)
print(Pipe(False)) # (<read-only Connection, handle 3>, <write-only Connection, handle 4>)
p = Process(target=demo,args=(p1,))
p.start()
p2.send('a')
p2.send(['a','b']) # 发送了两次 需要接受两次
p.join()
print(p2.recv()) # [1,2,3,4]
注意:
(1)p1.recv() 接收p2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
(2)p1.send(obj) 发送一个使用pickle模块序列化obj对象。
队列Queue
class multiprocessing.Queue([maxsize])
maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
方法:
qsize() 返回队列的近似大小,这个数字是不可靠的。
empty() 如果为空返回True。
full() 如果队列已满返回True。
put(obj [, block[, timeout]]) 将obj放入队列中。如果可选参数block是True(默认)并且timeout是None(默认),则会一直阻塞,直到有空余空间可用。如果timeout是一个正数,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间,如果超过指定时间,会抛出Queue.Full异常。如果block为False,但该Queue已满,会立即抛出Queue.Full异常。
get([block[, timeout]]) 从队列中取出一个元素。如果block为True(默认),并且timeout为正值,则在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果block为False,同时Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。
get_nowait() 同q.get(False)。
put_nowait() 同q.put(obj, False)。
实例1:
from multiprocessing import Process,Queue
def demo(que):
print("子进程中:", que.get()) # 获取到父进程传递过来的['a', 'b', 'c']
for i in range(5):
que.put(i)
if __name__ == '__main__':
que = Queue() # 不指定队列长度,默认长度无限制
p1 = Process(target=demo, args=(que,))
p1.start()
que.put(['a', 'b', 'c']) # 在队列中可以存储列表,字典等类型。
p1.join()
print(que.get()) # 0
print(que.get()) # 1
print(que.get()) # 2
实例2:
from multiprocessing import Process,Queue
def read(que):
print("子进程读")
print(que.get()) # [1,2,3,4]
def write(que):
print("子进程写")
que.put([1,2,3,4])
# 两个子进程之间通过队列进行数据传输
if __name__ == '__main__':
que = Queue()
p1 = Process(target=read,args=(que,))
p2 = Process(target=write,args=(que,))
p1.start()
p2.start()
p1.join()
p2.join()
共享数据Manage使用代理对象
使用ListProxy
实例1:
from multiprocessing import Manager
manager = Manager()
myList = manager.list([i for i in range(10)])
print(myList) # <ListProxy object, typeid 'list' at 0x...>
print(str(myList)) # [0, 1, 2, 3, 4, 5,...]
myList.append(100)
print(str(myList)) # [0, 1, 2, 3, 4, 5,...100]
实例2:
from multiprocessing import Manager, Process
def funcList(mylist):
mylist.append('a') # 在列表中追加值
mylist.append('b')
if __name__ == '__main__':
myList = multiprocessing.Manager().list()
myListP = multiprocessing.Process(target=funcList,args=(myList,)
myListP.start()
myListP.join() # 等待进程执行完毕
print(myList) # ['a', 'b']
使用DictProxy
实例:
from multiprocessing import Manager, Process
def funcDict(mydict):
mydict['name'] = 'zs'
mydict['age'] = 1
if __name__ == '__main__':
myDict = multiprocessing.Manager().dict()
myDictP = multiprocessing.Process(target=funcDict,args=(myDict,))
myDictP.start()
myDictP.join() # 等待进程执行完毕
print(myDict) # {'name': 'zs', 'age': 12}
参考文章链接:
http://www.cnblogs.com/lidagen/p/7252247.html
https://docs.python.org/3.6/library/multiprocessing.html
网友评论