美文网首页
Python42_多任务之进程

Python42_多任务之进程

作者: jxvl假装 | 来源:发表于2019-09-28 10:56 被阅读0次

    进程

    ps:创建子进程时,会将父进程的代码拷贝一份,但是这个拷贝是“写时拷贝”,即,只有当子进程对数据进程修改时才会单独拷贝一份,不然就是共用一份代码

    进程是拥有资源的最小单位,进程中任务的执行也是通过线程完成的

    fork创建子进程(Linux)

    for创建新进程

    import os
    os.fork()
    print("haha")
    #会发现输出两次“haah“
    

    os.fork()有两个返回值

    import os
    ret = os.fork() #ps:因为os模块与系统相关,所以在windows中没有fork函数,与Linux不同
    #当程序执行到上面一句代码时会创建一个新的进程,这个新的进程也从fork处往下开始走,主进程执行print(2),即返回值大于0(但是每次执行结果可能不一样,为所创建子进程的pid),子进程中的ret==0,执行print(1)
    if ret == 0:
        print(1)
    else:
        print(2)
    #会发现输出结果为2和1,但是也可能输出1和2,(先后顺序问题,这个是由操作系统调度算法决定的)
    #os中,os.getpid()可以获得当前进程的pid,os.getppid()可以获得当前进程父进程的pid(第一个p为parent)
    

    父子进程调度的先后顺序不确定,哪怕是父进程结束后,子进程还是可以继续执行

    全局变量在多进程中不共享

    import os
    import time
    
    g_num = 100
    
    ret = os.fork()
    if ret == 0:
        print("---process 1---")
        g_num += 1
        print("---process 1 g_num = %d---"%g_num)
    else:
        time.sleep(3)   #保证子进程先执行
        print("---process 2---")
        print("---process 2 g_num = %d---"%g_num)   #会发现g_num还是100,即全局变量在多进程中不共享
    

    多次fork

    case1

    import os
    import time
    
    ret = os.fork()
    if ret == 0:
        #子进程
        print("--1--")
    else:
        #父进程
        pirnt("--2--")
    
    #父子进程
    ret = os.fork()
    if ret == 0:
        #孙子进程(上面子进程的子进程)
        #2儿子(父进程又创建的子进程)
        print("--11--")
    else:
        #子进程
        #父进程
        pirnt("--22--")
    #运行结果:1和2各输出1次,11和22各自输出两次
    

    case2

    import os
    import time
    
    ret = os.fork()
    if ret == 0:
        #子进程
        print("--1--")
    else:
        #父进程
        pirnt("--2--")
        ret = os.fork()
        if ret == 0:
            #2儿子(父进程又创建的子进程)
            print("--11--")
        else:
            #父进程
            pirnt("--22--")
    

    ps:fork炸弹

    import os
    while True:
        os.fork()
    

    winodws上的多进程:multiprocessing

    利用multiprocessing中提供的Process类实现,达到跨平台的多进程效果

    multiprocessing:对于windows和Linux,其创建子进程的方法不一样(如:linux是用fork,而windows上不是),所以python提供了multiprocssing,根据安装环境的不同,其会调用对应的系统的方法进程子进程的创建

    caution:用Process创建子进程,主进程要等所有的子进程结束后才结束,这与fork是不同的

    import multiprocessing  #负责多进程的模块
    import time
    
    def run(name):
        print("task: %s"%name)
        time.sleep(2)
    def main():
        p1 = multiprocessing.Process(target=run, args=("P1",))
        p2 = multiprocessing.Process(target=run, args=("P2",))
        p1.start()  #子进程开始执行
        p2.start()
        print(p1.pid, p2.pid)
        print(p1.name, p2.name)
        p1.join()   #等待子进程p1的结束后再往下走:阻塞,可加参数:timeout,超时时间(等待的最长时间),即如果时间在timeout内如果还没有结束,仍然继续向下结束
        p2.join()
        print("finished")
    
    if __name__ == "__main__":  #因为启动一个进程的时候,会再次调用此文件,所以:如果不用此方法调用,则会不断反复调用此模块
        main()
    

    进程的创建:Process([group[, target[, name[, args[, kwargs]]]]])

    • group:大多数情况用不到
    • target:表示这个进程实例所调用的对象(函数)
    • name:为当前进程实例的别名
    • args:表示调用对象的位置参数元组
    • kwargs:表示调用对象的关键字参数

    Process类的常用方法

    • is_alive():判断进程实例是否还在执行
    • join([timeout]):是否等待进程执行结束,或等待多少秒
    • start():启动进程实例
    • run():如果没有给定targe参数,的这个对象调用start()方法时,就将执行对象中的run()方法
    • terminate():不管任务是否完成,立即终止

    Process常用属性

    • name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数
    • pid:当前进程实例的PID值
    • ppid:当前进程父进程的PID值,第一个p表示parent

    进程的创建-Process子类

    创建新的进程还能够使用类的方法,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象

    from multiprocessing import Process
    import os,time
    class Process_Class(Process):
        #因为Process类本省有__init__方法,这个子类相当于重写了这个方法
        #但这样就会带来一些问题,我们并没有完全初始化一个Process类,所以就不能使用从这个类继承的一些方法
        #所以需要调用父类的init方法
        def __init__(self, interval):
            super().__init__()
            self.interval = interval
    
        def run(self):  #进程要执行的部分(由start自动调用)
            #重写了Process类的run方法
            print("子进程(%s)开始执行,父进程(%s)"%(os.getpid(), os.getppid()))
            t_start = time.time()
            time.sleep((self.interval))
            t_stop = time.time()
            print("(%s)执行结束,耗时%0.2f秒"%(os.getppid(), t_stop - t_start))
    
    if __name__ == "__main__":
        t_start = time.time()
        print("当前程序进程(%s)"%os.getpid())
        p1 = Process_Class(2)
        p1.start()  #进程开始执行,start会自动调用run方法
        p1.join()
        t_stop = time.time()
        print("(%s)执行结束,耗时%0.2f秒" % (os.getppid(), t_stop - t_start))
    

    进程池

    当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing中的Pool方法

    池的作用:起到一个缓冲的作用。进程池:先创建一堆进程在那里放着,而不用等到用的时候再去创建

    ps:进程池中进程执行如果发生了异常,不会有提示

    from multiprocessing import Pool
    import os, time, random
    
    def worker(msg):
        t_start = time.time()
        print("%s开始执行,进程号为%d" % (msg,os.getpid()))
        # random.random()随机生成0~1之间的浮点数
        time.sleep(random.random()*2)
        t_stop = time.time()
        print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))
    
    if __name__ == "__main__":
        po = Pool(3)  # 定义一个进程池,最大进程数3
        print("----start----")
        for i in range(0,10):
            po.apply_async(worker,(i,))  # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))。apply_async:非阻塞方式(异步)
        po.close()  # 关闭进程池,关闭后po不再接收新的请求
        po.join()  # 等待po中所有子进程执行完成,再执行下面的代码,可以设置超时时间join(timeout=)
        print("-----end-----")
    '''运行结果如下:
    ----start----
    0开始执行,进程号为15312
    1开始执行,进程号为2020
    2开始执行,进程号为3148
    1 执行完毕,耗时1.62
    3开始执行,进程号为2020
    2 执行完毕,耗时1.63
    4开始执行,进程号为3148
    0 执行完毕,耗时1.84
    5开始执行,进程号为15312
    4 执行完毕,耗时0.37
    6开始执行,进程号为3148
    5 执行完毕,耗时0.56
    7开始执行,进程号为15312
    7 执行完毕,耗时0.13
    8开始执行,进程号为15312
    8 执行完毕,耗时0.02
    9开始执行,进程号为15312
    9 执行完毕,耗时0.27
    6 执行完毕,耗时1.12
    3 执行完毕,耗时1.78
    -----end-----
    ''' 
    

    多种创建进程方式的比较

    # 方式一:不建议使用(太底层)
    ret = os.fork()
    if ret == 0:
        #子进程
    else:
        #父进程
    
    # 方式二:主进程不会在子进程之前结束,所以可以用主进程做一些其他的事情
    p1 = Process(target = xxx)
    p1.start()
    
    # 方式三:主进程一般用来等待(因为主进程可能在子进程之前结束,所以需要用join),真正的任务都在子进程中执行
    pool = Pool(3)  #进程池中的任务不是越多越好,因为任务数越多,意味着轮一圈的周长越长。根据硬件、系统版本等,得到压力测试的值才合理
    pool.apply_async(xxx)
    

    apply阻塞式添加任务

    基本不用

    需要等到上一个任务执行完毕之后才会继续运行,往池里添加新任务,但是如果这样的话,就不能让多个进程一起执行

    from multiprocessing import Pool
    import os, time, random
    
    def worker(msg):
        t_start = time.time()
        print("%s开始执行,进程号为%d" % (msg,os.getpid()))
        # random.random()随机生成0~1之间的浮点数
        time.sleep(random.random()*2)
        t_stop = time.time()
        print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))
    
    if __name__ == "__main__":
        po = Pool(3)  # 定义一个进程池,最大进程数3
        print("----start----")
        for i in range(0,10):
            po.apply(worker,(i,))  # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))。apply_async:非阻塞方式(异步),当一个进程执行完毕之后才继续往里面添加任务
        po.close()
        po.join() join(timeout=)
        print("-----end-----")
    '''#运行结果如下
    ----start----
    0开始执行,进程号为16136
    0 执行完毕,耗时0.38
    1开始执行,进程号为10228
    1 执行完毕,耗时1.02
    2开始执行,进程号为2536
    2 执行完毕,耗时0.89
    3开始执行,进程号为16136
    3 执行完毕,耗时1.69
    4开始执行,进程号为10228
    4 执行完毕,耗时0.56
    5开始执行,进程号为2536
    5 执行完毕,耗时1.57
    6开始执行,进程号为16136
    6 执行完毕,耗时1.29
    7开始执行,进程号为10228
    7 执行完毕,耗时0.44
    8开始执行,进程号为2536
    8 执行完毕,耗时1.33
    9开始执行,进程号为16136
    9 执行完毕,耗时1.79
    -----end-----
    '''
    

    进程间的通信:Queue

    queue(队列):先进先出
    栈:先进后出

    常用方法:

    from multiprocessing import Queue
    
    q = Queue(3)    #创建一个队列对象,容量的大小为3,可以不指定大小,则可以往添加任意多的数据
    q.qsize()   #获得当前队列中的数据数量
    q.put("xxx")    #往队列里面放数据,如果队列已满,默认阻塞。可以往队列里面添加任何形式的数据,哪怕是一个对象也可以
    q.get() #从队列里面取数据,如果队列已空,默认阻塞
    q.empty()   #判断队列是否已空
    q.full()    #判断队列是否已满
    q.get_nowait()  #如果队列已空,不阻塞,但是已异常的方式提示队列已空
    q.put_nowait()  #如果队列已满,不阻塞,但是以异常的方式提示
    

    实例:

    from multiprocessing import Process,Queue
    import os, time,random
    
    def write(q):
        for value in ["A","B","C","D"]:
            print("Put {} to queue".format(value))
            q.put(value)
            time.sleep(random.random())
    
    def read(q):
        while True:
            if not q.empty():
                value = q.get(True)
                print("Get {} from queue".format(value))
                time.sleep(random.random())
            else:
                break
    
    if __name__ == "__main__":
        #父进程创建Queeu,并传递给各个子进程
        q = Queue() #不限制大小
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))
    
        pw.start()   #启动子进程,写入
        pw.join()   #等待子进程结束
    
        pr.start()  #启动pr子进程,读出
        pr.join()
    
    '''运行结果如下:
    Put A to queue
    Put B to queue
    Put C to queue
    Put D to queue
    Get A from queue
    Get B from queue
    Get C from queue
    Get D from queue
    '''
    

    进程池中的Queue

    直接从multiprocessing里面导入的Queue用于直接用Process创建的子进程间的通信,如果是进程池里面进程的通信,则用multiprocessing里面的Manager().Queue(),两种方式得到的队列的使用方式完全一样

    import os
    # 进程池中的Queeu
    from multiprocessing import Manager, Pool
    
    
    def reader(q):
        print("reader启动{},父进程为{}".format(os.getpid(), os.getppid()))
        for _ in range(q.qsize()):
            print("reader从Queue获取信息:{}".format(q.get()))
    
    
    def writer(q):
        print("writer启动{},父进程为{}".format(os.getpid(), os.getppid()))
        for i in "Iterable":
            q.put(i)
    
    
    if __name__ == "__main__":
        print("{} start".format(os.getpid()))
        q = Manager().Queue()  # 用manager里面的queue来完成进程池里面进程的通信
        pl = Pool()
        pl.apply(writer, (q,))  # 阻塞式添加进程,以让进程先往队列里面添加,再取出
        pl.apply(reader, (q,))
        pl.close()  # 关闭进程池,禁止再往里面添加任务
        pl.join()  # 等待进程结束
        print("{} end".format(os.getpid()))
    
    '''运行结果如下:
    9932 start
    writer启动15896,父进程为9932
    reader启动3432,父进程为9932
    reader从Queue获取信息:I
    reader从Queue获取信息:t
    reader从Queue获取信息:e
    reader从Queue获取信息:r
    reader从Queue获取信息:a
    reader从Queue获取信息:b
    reader从Queue获取信息:l
    reader从Queue获取信息:e
    9932 end
    '''
    

    多进程的应用:文件拷贝

    import os
    from multiprocessing import Process
    
    
    def copyFileTask(old_file_name, new_file_name):
        """
        仅负责将旧文件复制到新文件
        :param old_file_name: 旧文件名,包括路径
        :param new_file_path: 新文件名,包括路径
        :return: None
        """
    
        print("复制文件{}中...".format(old_file_name))
        with open(old_file_name, "rb+") as fr:
            content = fr.read()
        with open(new_file_name, "wb+") as fw:
            fw.write(content)
        print("复制文件{}完成!".format(old_file_name))
    
    
    def copyFolderTask(old_folder_name, new_folder_name):
        """
        负责将旧文件夹内的东西复制到新文件夹
        :param old_folder_name: 旧文件夹路径
        :param new_folder_name: 新文件夹路径
        :return: None
        """
        if not os.path.exists(new_folder_name):
            print("目标文件夹不存在,创建中...")
            os.mkdir(new_folder_name)
            print("目标文件夹创建完成!")
        files = os.listdir(old_folder_name)
    
        for file in files:
            old_file = os.path.join(old_folder_name, file)
            new_file = os.path.join(new_folder_name, file)
    
    
            if os.path.isfile(old_file):
                print(file)
                pro = Process(target=copyFileTask, args=(old_file, new_file))   #为了不在主进程中join,使用Process而不是进程池
                pro.start()
            elif os.path.isdir(old_file):
                copyFolderTask(old_file, new_file)
    
    
    if __name__ == "__main__":
        #注意:以下路径均为绝对路径
        old_folder_path = input("请输入要拷贝文件夹路径:")
        new_folder_path = input("请输入新文件夹路径:")
    
        if not os.path.exists(old_folder_path):
            print("目标文件夹不存在,请重试")
            exit()
        else:
            copyFolderTask(old_folder_path, new_folder_path)
    

    ps:print("\r要输出的内容")在行首输出内容(同时会删除本行已有内容),如果要对以上复制文件的程序改进使其显示进度,可以用到

    相关文章

      网友评论

          本文标题:Python42_多任务之进程

          本文链接:https://www.haomeiwen.com/subject/kihuuctx.html