进程
ps:创建子进程时,会将父进程的代码拷贝一份,但是这个拷贝是“写时拷贝”,即,只有当子进程对数据进程修改时才会单独拷贝一份,不然就是共用一份代码
进程是拥有资源的最小单位,进程中任务的执行也是通过线程完成的
fork创建子进程(Linux)
for创建新进程
import os
os.fork()
print("haha")
#会发现输出两次“haah“
os.fork()有两个返回值
import os
ret = os.fork() #ps:因为os模块与系统相关,所以在windows中没有fork函数,与Linux不同
#当程序执行到上面一句代码时会创建一个新的进程,这个新的进程也从fork处往下开始走,主进程执行print(2),即返回值大于0(但是每次执行结果可能不一样,为所创建子进程的pid),子进程中的ret==0,执行print(1)
if ret == 0:
print(1)
else:
print(2)
#会发现输出结果为2和1,但是也可能输出1和2,(先后顺序问题,这个是由操作系统调度算法决定的)
#os中,os.getpid()可以获得当前进程的pid,os.getppid()可以获得当前进程父进程的pid(第一个p为parent)
父子进程调度的先后顺序不确定,哪怕是父进程结束后,子进程还是可以继续执行
全局变量在多进程中不共享
import os
import time
g_num = 100
ret = os.fork()
if ret == 0:
print("---process 1---")
g_num += 1
print("---process 1 g_num = %d---"%g_num)
else:
time.sleep(3) #保证子进程先执行
print("---process 2---")
print("---process 2 g_num = %d---"%g_num) #会发现g_num还是100,即全局变量在多进程中不共享
多次fork
case1
import os
import time
ret = os.fork()
if ret == 0:
#子进程
print("--1--")
else:
#父进程
pirnt("--2--")
#父子进程
ret = os.fork()
if ret == 0:
#孙子进程(上面子进程的子进程)
#2儿子(父进程又创建的子进程)
print("--11--")
else:
#子进程
#父进程
pirnt("--22--")
#运行结果:1和2各输出1次,11和22各自输出两次
case2
import os
import time
ret = os.fork()
if ret == 0:
#子进程
print("--1--")
else:
#父进程
pirnt("--2--")
ret = os.fork()
if ret == 0:
#2儿子(父进程又创建的子进程)
print("--11--")
else:
#父进程
pirnt("--22--")
ps:fork炸弹
import os
while True:
os.fork()
winodws上的多进程:multiprocessing
利用multiprocessing中提供的Process类实现,达到跨平台的多进程效果
multiprocessing:对于windows和Linux,其创建子进程的方法不一样(如:linux是用fork,而windows上不是),所以python提供了multiprocssing,根据安装环境的不同,其会调用对应的系统的方法进程子进程的创建
caution:用Process创建子进程,主进程要等所有的子进程结束后才结束,这与fork是不同的
import multiprocessing #负责多进程的模块
import time
def run(name):
print("task: %s"%name)
time.sleep(2)
def main():
p1 = multiprocessing.Process(target=run, args=("P1",))
p2 = multiprocessing.Process(target=run, args=("P2",))
p1.start() #子进程开始执行
p2.start()
print(p1.pid, p2.pid)
print(p1.name, p2.name)
p1.join() #等待子进程p1的结束后再往下走:阻塞,可加参数:timeout,超时时间(等待的最长时间),即如果时间在timeout内如果还没有结束,仍然继续向下结束
p2.join()
print("finished")
if __name__ == "__main__": #因为启动一个进程的时候,会再次调用此文件,所以:如果不用此方法调用,则会不断反复调用此模块
main()
进程的创建:Process([group[, target[, name[, args[, kwargs]]]]])
- group:大多数情况用不到
- target:表示这个进程实例所调用的对象(函数)
- name:为当前进程实例的别名
- args:表示调用对象的位置参数元组
- kwargs:表示调用对象的关键字参数
Process类的常用方法
- is_alive():判断进程实例是否还在执行
- join([timeout]):是否等待进程执行结束,或等待多少秒
- start():启动进程实例
- run():如果没有给定targe参数,的这个对象调用start()方法时,就将执行对象中的run()方法
- terminate():不管任务是否完成,立即终止
Process常用属性
- name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数
- pid:当前进程实例的PID值
- ppid:当前进程父进程的PID值,第一个p表示parent
进程的创建-Process子类
创建新的进程还能够使用类的方法,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象
from multiprocessing import Process
import os,time
class Process_Class(Process):
#因为Process类本省有__init__方法,这个子类相当于重写了这个方法
#但这样就会带来一些问题,我们并没有完全初始化一个Process类,所以就不能使用从这个类继承的一些方法
#所以需要调用父类的init方法
def __init__(self, interval):
super().__init__()
self.interval = interval
def run(self): #进程要执行的部分(由start自动调用)
#重写了Process类的run方法
print("子进程(%s)开始执行,父进程(%s)"%(os.getpid(), os.getppid()))
t_start = time.time()
time.sleep((self.interval))
t_stop = time.time()
print("(%s)执行结束,耗时%0.2f秒"%(os.getppid(), t_stop - t_start))
if __name__ == "__main__":
t_start = time.time()
print("当前程序进程(%s)"%os.getpid())
p1 = Process_Class(2)
p1.start() #进程开始执行,start会自动调用run方法
p1.join()
t_stop = time.time()
print("(%s)执行结束,耗时%0.2f秒" % (os.getppid(), t_stop - t_start))
进程池
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing中的Pool方法
池的作用:起到一个缓冲的作用。进程池:先创建一堆进程在那里放着,而不用等到用的时候再去创建
ps:进程池中进程执行如果发生了异常,不会有提示
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s开始执行,进程号为%d" % (msg,os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))
if __name__ == "__main__":
po = Pool(3) # 定义一个进程池,最大进程数3
print("----start----")
for i in range(0,10):
po.apply_async(worker,(i,)) # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))。apply_async:非阻塞方式(异步)
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,再执行下面的代码,可以设置超时时间join(timeout=)
print("-----end-----")
'''运行结果如下:
----start----
0开始执行,进程号为15312
1开始执行,进程号为2020
2开始执行,进程号为3148
1 执行完毕,耗时1.62
3开始执行,进程号为2020
2 执行完毕,耗时1.63
4开始执行,进程号为3148
0 执行完毕,耗时1.84
5开始执行,进程号为15312
4 执行完毕,耗时0.37
6开始执行,进程号为3148
5 执行完毕,耗时0.56
7开始执行,进程号为15312
7 执行完毕,耗时0.13
8开始执行,进程号为15312
8 执行完毕,耗时0.02
9开始执行,进程号为15312
9 执行完毕,耗时0.27
6 执行完毕,耗时1.12
3 执行完毕,耗时1.78
-----end-----
'''
多种创建进程方式的比较
# 方式一:不建议使用(太底层)
ret = os.fork()
if ret == 0:
#子进程
else:
#父进程
# 方式二:主进程不会在子进程之前结束,所以可以用主进程做一些其他的事情
p1 = Process(target = xxx)
p1.start()
# 方式三:主进程一般用来等待(因为主进程可能在子进程之前结束,所以需要用join),真正的任务都在子进程中执行
pool = Pool(3) #进程池中的任务不是越多越好,因为任务数越多,意味着轮一圈的周长越长。根据硬件、系统版本等,得到压力测试的值才合理
pool.apply_async(xxx)
apply阻塞式添加任务
基本不用
需要等到上一个任务执行完毕之后才会继续运行,往池里添加新任务,但是如果这样的话,就不能让多个进程一起执行
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s开始执行,进程号为%d" % (msg,os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))
if __name__ == "__main__":
po = Pool(3) # 定义一个进程池,最大进程数3
print("----start----")
for i in range(0,10):
po.apply(worker,(i,)) # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))。apply_async:非阻塞方式(异步),当一个进程执行完毕之后才继续往里面添加任务
po.close()
po.join() join(timeout=)
print("-----end-----")
'''#运行结果如下
----start----
0开始执行,进程号为16136
0 执行完毕,耗时0.38
1开始执行,进程号为10228
1 执行完毕,耗时1.02
2开始执行,进程号为2536
2 执行完毕,耗时0.89
3开始执行,进程号为16136
3 执行完毕,耗时1.69
4开始执行,进程号为10228
4 执行完毕,耗时0.56
5开始执行,进程号为2536
5 执行完毕,耗时1.57
6开始执行,进程号为16136
6 执行完毕,耗时1.29
7开始执行,进程号为10228
7 执行完毕,耗时0.44
8开始执行,进程号为2536
8 执行完毕,耗时1.33
9开始执行,进程号为16136
9 执行完毕,耗时1.79
-----end-----
'''
进程间的通信:Queue
queue(队列):先进先出
栈:先进后出
常用方法:
from multiprocessing import Queue
q = Queue(3) #创建一个队列对象,容量的大小为3,可以不指定大小,则可以往添加任意多的数据
q.qsize() #获得当前队列中的数据数量
q.put("xxx") #往队列里面放数据,如果队列已满,默认阻塞。可以往队列里面添加任何形式的数据,哪怕是一个对象也可以
q.get() #从队列里面取数据,如果队列已空,默认阻塞
q.empty() #判断队列是否已空
q.full() #判断队列是否已满
q.get_nowait() #如果队列已空,不阻塞,但是已异常的方式提示队列已空
q.put_nowait() #如果队列已满,不阻塞,但是以异常的方式提示
实例:
from multiprocessing import Process,Queue
import os, time,random
def write(q):
for value in ["A","B","C","D"]:
print("Put {} to queue".format(value))
q.put(value)
time.sleep(random.random())
def read(q):
while True:
if not q.empty():
value = q.get(True)
print("Get {} from queue".format(value))
time.sleep(random.random())
else:
break
if __name__ == "__main__":
#父进程创建Queeu,并传递给各个子进程
q = Queue() #不限制大小
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start() #启动子进程,写入
pw.join() #等待子进程结束
pr.start() #启动pr子进程,读出
pr.join()
'''运行结果如下:
Put A to queue
Put B to queue
Put C to queue
Put D to queue
Get A from queue
Get B from queue
Get C from queue
Get D from queue
'''
进程池中的Queue
直接从multiprocessing里面导入的Queue用于直接用Process创建的子进程间的通信,如果是进程池里面进程的通信,则用multiprocessing里面的Manager().Queue(),两种方式得到的队列的使用方式完全一样
import os
# 进程池中的Queeu
from multiprocessing import Manager, Pool
def reader(q):
print("reader启动{},父进程为{}".format(os.getpid(), os.getppid()))
for _ in range(q.qsize()):
print("reader从Queue获取信息:{}".format(q.get()))
def writer(q):
print("writer启动{},父进程为{}".format(os.getpid(), os.getppid()))
for i in "Iterable":
q.put(i)
if __name__ == "__main__":
print("{} start".format(os.getpid()))
q = Manager().Queue() # 用manager里面的queue来完成进程池里面进程的通信
pl = Pool()
pl.apply(writer, (q,)) # 阻塞式添加进程,以让进程先往队列里面添加,再取出
pl.apply(reader, (q,))
pl.close() # 关闭进程池,禁止再往里面添加任务
pl.join() # 等待进程结束
print("{} end".format(os.getpid()))
'''运行结果如下:
9932 start
writer启动15896,父进程为9932
reader启动3432,父进程为9932
reader从Queue获取信息:I
reader从Queue获取信息:t
reader从Queue获取信息:e
reader从Queue获取信息:r
reader从Queue获取信息:a
reader从Queue获取信息:b
reader从Queue获取信息:l
reader从Queue获取信息:e
9932 end
'''
多进程的应用:文件拷贝
import os
from multiprocessing import Process
def copyFileTask(old_file_name, new_file_name):
"""
仅负责将旧文件复制到新文件
:param old_file_name: 旧文件名,包括路径
:param new_file_path: 新文件名,包括路径
:return: None
"""
print("复制文件{}中...".format(old_file_name))
with open(old_file_name, "rb+") as fr:
content = fr.read()
with open(new_file_name, "wb+") as fw:
fw.write(content)
print("复制文件{}完成!".format(old_file_name))
def copyFolderTask(old_folder_name, new_folder_name):
"""
负责将旧文件夹内的东西复制到新文件夹
:param old_folder_name: 旧文件夹路径
:param new_folder_name: 新文件夹路径
:return: None
"""
if not os.path.exists(new_folder_name):
print("目标文件夹不存在,创建中...")
os.mkdir(new_folder_name)
print("目标文件夹创建完成!")
files = os.listdir(old_folder_name)
for file in files:
old_file = os.path.join(old_folder_name, file)
new_file = os.path.join(new_folder_name, file)
if os.path.isfile(old_file):
print(file)
pro = Process(target=copyFileTask, args=(old_file, new_file)) #为了不在主进程中join,使用Process而不是进程池
pro.start()
elif os.path.isdir(old_file):
copyFolderTask(old_file, new_file)
if __name__ == "__main__":
#注意:以下路径均为绝对路径
old_folder_path = input("请输入要拷贝文件夹路径:")
new_folder_path = input("请输入新文件夹路径:")
if not os.path.exists(old_folder_path):
print("目标文件夹不存在,请重试")
exit()
else:
copyFolderTask(old_folder_path, new_folder_path)
ps:print("\r要输出的内容")
在行首输出内容(同时会删除本行已有内容),如果要对以上复制文件的程序改进使其显示进度,可以用到
网友评论