美文网首页散文
python多进程与多线程、互斥锁与信号量

python多进程与多线程、互斥锁与信号量

作者: Cache_wood | 来源:发表于2021-12-05 08:42 被阅读0次

    @[toc]

    并行与并发

    • 并行 parallel:同一时刻多条指令在多个处理器上同时执行
    • 并发 concurrency:同一时刻只有一条指令执行,<font color='red'>但多个指令快速轮换执行</font>。在宏观上表现多个指令同时执行的效果

    进程与线程

    • 进程 process:操作系统分配资源的基本单位
    • 线程 thread:CPU调度和分派的基本单位
    • 应用程序至少有一个进程和一个线程
    • 同一进程的多个线程可以并发执行
    • 进程在执行过程中拥有独立的内存单元,而线程共享内存
    • 多进程编程需要考虑进程间的通信 IPC
    • 进程切换时,耗费资源较大

    多进程

    • 进程 process
      • 进程号 os.getpid()
      • 孤儿进程:子进程在父进程退出后仍在运行,会被init进程接管,init以父进程的身份处理子进程运行完毕后遗留的状态信息
      • 僵尸进程
        • 父进程创建子进程后,如果子进程退出,但父进程并没有调用wait或waitoid获取子进程的状态信息,那么子进程的进程描述符将一直保存于系统,对应的子进程称为僵尸系统。
        • 僵尸进程无法通过kill命令来清除
    • multiprocessing模块
      • 开启子进程并在其中执行定制任务
      • 提供process,queue,pipe,lock等关键字组件
      • 支持进程间的通信与数据共享
      • 执行不同形式的同步
      • 处理僵尸进程
    • Process
      • 创建进程类
      • 实例对象表示一个子进程(尚未启动)
      • 使用关键字的方式来指定参数
        • target表示调用对象,即子进程要执行的任务
        • args指定传给target调用对象的位置参数,元组形式,仅有一个参数要有逗号
        • kwargs表示调用对象的字典参数
        • name为子进程 的名称

    在Windows中Process()必须放到if __name__ == '__main__'

    • p.start()
      • 启动进程,并调用该子进程中的p.run()
    • p.run()
      • 进程启动时运行的方法,会调用target,自定义类
      定要实现该方法
    • p.terminate()
      • 强制终止进程p,但不进行任何清理操作
      • 如果p创建了子进程,该子进程将成为僵尸进程
      • p创建的锁也将不会释放,可能导致死锁
      • 一般不建议使用
    • p.is_alive()
      • 如果p仍然运行,返回True
      – p.join([timeout])
      • 主进程等待p终止(主进程处于等状态,而p处于运
      行状态)
      • timeout是可选的超时时间
      • 只能join住start开启的进程,而不能join住run
      开启的进程
    • p.daemon
      • 默认值为False
      • 可以设置为True,但必须在p.start()之前设置,成为后台
      运行的守护进程,当p的父进程终止时,p也随之终止,且p不
      能创建新的子进程
    • p.name
      • 进程的名称
    • p.pid
      • 进程的pid
    • p.exitcode
      – 进程在运行时为None
      • 如果为–N,表示被信号N结束
    • p.authkey
      • 进程的身份验证键

    <font color='blue'>eg: 通过Process实现

    from multiprocessing import Process
    print('main process start')
    def run():
        pass
    if __name__=='__main__':
        p=Process(target=run)#recursive
        p.start()
    
    main process start
    main process start
    

    <font color='blue'>eg: 通过继承Process实现

    import time
    import random
    from multiprocessing import Process
    
    n=0
    
    def task(name):
        print("task of {} starts".format(name))
        time.sleep(random.randrange(50,100))
        print("task of {} ends".format(name))
    
    class Task(Process):
        def __init__(self,name):
            super().__init__()#不能忘了
            self._name=name
    
        @property
        def name(self):
            return self._name
    
        def run(self):#不能忘了
            print('task {} starts with pid {}...'.format(self.name,self.pid))
            global n
            n=random.random()
            print("n={} in process {}".format(n,self.name))
            #mem
            l=list(range(int(n*100000000)))#观察内存的不同
            time.sleep(random.randrange(20,50))
            print('task {} ends with pid {}...'.format(self.name,self.pid))
    
    if __name__=='__main__':
        '''for i in range(0,4):
            p=Process(target=task,args=('task {}'.format(i),))
            p.start()
            print('pid:{}'.format(p.pid))'''
        plist=[]
        for i in range(0,4):
            p=Task('process-{}'.format(i+1))
            plist.append(p) 
        for p in plist:
            p.start()
        for p in plist:
            #注意一定是先子进程都启动后,再一一join,否则启动后马上join会变成“串行”
            #1. 是的,这样写join仍然会卡着等p1运行结束,但其他进程如p2, p3等仍在运行,等p1运行结束后,循环继续,p2,p3等可能也运行结束了,会迅速完成join的检验
            #2. join花费的总时间仍然是耗费时间最长的那个进程运行的时间,这样跟我们的目的是一致的。
            pass
            p.join()
        print('main')
        print("n={} in main".format(n))
    

    首先用start函数启动所有的子进程,之后重新join所有的子进程,这样可能p1会等待运行结果但不影响其他进程的运行。

    如果在start函数的后面直接join,在p1的时候所有的进程都被卡住,相当于“串行”

    task process-1 starts with pid 13044...
    n=0.17599680720372746 in process process-1
    task process-2 starts with pid 3368...
    n=0.2165234151520603 in process process-2
    task process-3 starts with pid 11588...
    task process-4 starts with pid 14920...
    n=0.8302889804652497 in process process-3
    n=0.7734786605773896 in process process-4
    task process-4 ends with pid 14920...
    task process-1 ends with pid 13044...
    task process-2 ends with pid 3368...
    task process-3 ends with pid 11588...
    main
    n=0 in main
    

    关于join的使用

    • 先在主进程中启动所有子进程
    • 然后在主进程中对所有子进程进行join

    • 子进程都启动后再join,启动后马上join会变成“串行”

    • 虽然join仍会等等p1运行结束,但其他子进程如p2, p3等仍在运行,等p1运行结束后,循环继续,p2,p3等可能也运行结束了,会迅速完成join的检验

    • join花费的总时间仍然是耗费时间最长的子进程的运行时间

    同步和异步

    • 同步
      • 按预定的顺序先后执行
      • 调用后需要等待返回结果
      • 同步是保证多进程安全访问竞争资源的一种手段
    • 异步
      • 与同步处理相对
      • 异步处理不用阻塞当前进程来等待处理完成,而是允许后续操作,并回调通知

    临界区

    • 临界资源:一次仅允许一个进程/线程使用的资源称为临界资源
    • 临界区
      • Critical Section
      • 存取临界资源的代码片段
      • 多进程要求进入空闲的临界区时,一次仅允许一个进程/线程进入
      • 如已有进程/线程进入临界区,则其他试图进入临界区的进程/线程需要等待
      • 进入临界区的进程/线程要在有限时间内退出
    • 互斥量 mutex:是一个仅处于两态之一的变量(解锁 or 加锁)

    多进程

    • 进程的同步
      • 实现机制
        • 互斥锁Lock
        • 信号量 Semaphore
        • 事件 Event
        • 条件 Condition
      • 特征
        • 可以用文件共享数据
        • 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个进程可以进行修改,即串行修改,但需要自己加锁处理
    互斥锁Lock
    from multiprocessing import Process
    from multiprocessing import Lock
    import json
    import random
    import time
    
    tf='ticks.json'
    
    def info():
        ticks=json.load(open(tf))
        print("ticks from {} to {} left {}".format(ticks['origin'],ticks['dest'],ticks['count']))
    
    def buy(pname):
        ticks=json.load(open(tf))
        time.sleep(random.random())
        if ticks['count']>0:
            ticks['count']-=1
            time.sleep(random.random())
            json.dump(ticks,open(tf,'w'))
            print('{} buy one tick from {} to {}!'.format(pname,ticks['origin'],ticks['dest']))
        else:
            print('oh....no ticks :(')
    
    def task(pname):
        info()
        buy(pname)
    
    #加锁
    def lock_task(name,lock):
        lock.acquire()
        try:
            info()
            buy(name)
        except:
            raise
        finally:
            lock.release()
    
    if __name__=='__main__':
        lock=Lock()
        clients=[]
        for i in range(20):
            name='client-{}'.format(i+1)
            #p=Process(target=task,name=name,args=(name,))
            p=Process(target=lock_task,name=name,args=(name,lock))
            clients.append(p)
        for p in clients:
            p.start()
        for p in clients:
            p.join()
        print("all clients finished...")
    

    加锁之后同一时间只能有一个进程进行连接,当该进程连接结束之后其他进程才能继续连接。

    前10个人买到票,后10个人没有票。同一时间只允许一个人买票。

    ticks from bj to cq left 10
    client-9 buy one tick from bj to cq!
    ticks from bj to cq left 9
    client-4 buy one tick from bj to cq!
    ticks from bj to cq left 8
    client-16 buy one tick from bj to cq!
    ticks from bj to cq left 7
    client-14 buy one tick from bj to cq!
    ticks from bj to cq left 6
    client-6 buy one tick from bj to cq!
    ticks from bj to cq left 5
    client-15 buy one tick from bj to cq!
    ticks from bj to cq left 4
    client-1 buy one tick from bj to cq!
    ticks from bj to cq left 3
    client-8 buy one tick from bj to cq!
    ticks from bj to cq left 2
    client-11 buy one tick from bj to cq!
    ticks from bj to cq left 1
    client-5 buy one tick from bj to cq!
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    all clients finished...
    

    如果不设置互斥锁Lock,而使用在启动每个线程之后直接join的形式,也可以得到同样的效果。

    from multiprocessing import Process
    from multiprocessing import Lock
    import json
    import random
    import time
    
    tf='ticks.json'
    
    def info():
        ticks=json.load(open(tf))
        print("ticks from {} to {} left {}".format(ticks['origin'],ticks['dest'],ticks['count']))
    
    def buy(pname):
        ticks=json.load(open(tf))
        time.sleep(random.random())
        if ticks['count']>0:
            ticks['count']-=1
            time.sleep(random.random())
            json.dump(ticks,open(tf,'w'))
            print('{} buy one tick from {} to {}!'.format(pname,ticks['origin'],ticks['dest']))
        else:
            print('oh....no ticks :(')
    
    def task(pname):
        info()
        buy(pname)
    
    if __name__=='__main__':
        clients=[]
        for i in range(20):
            name='client-{}'.format(i+1)
            p=Process(target=task,name=name,args=(name,))
            #p=Process(target=lock_task,name=name,args=(name,lock))
            clients.append(p)
        for p in clients:
            p.start()
        #for p in clients:
            p.join()
        print("all clients finished...")
    
    ticks from bj to cq left 10
    client-1 buy one tick from bj to cq!
    ticks from bj to cq left 9
    client-2 buy one tick from bj to cq!
    ticks from bj to cq left 8
    client-3 buy one tick from bj to cq!
    ticks from bj to cq left 7
    client-4 buy one tick from bj to cq!
    ticks from bj to cq left 6
    client-5 buy one tick from bj to cq!
    ticks from bj to cq left 5
    client-6 buy one tick from bj to cq!
    ticks from bj to cq left 4
    client-7 buy one tick from bj to cq!
    ticks from bj to cq left 3
    client-8 buy one tick from bj to cq!
    ticks from bj to cq left 2
    client-9 buy one tick from bj to cq!
    ticks from bj to cq left 1
    client-10 buy one tick from bj to cq!
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    ticks from bj to cq left 0
    oh....no ticks :(
    all clients finished...
    
    信号量 Semaphore
    from multiprocessing import Process
    from multiprocessing import Semaphore
    from multiprocessing import current_process
    import time
    import random
    
    def get_connections(s):
        s.acquire()
        try:
            print(current_process().name+' acqiure a connection')
            time.sleep(random.randint(1,2))
            print(current_process().name+' finishes its job and return the connection')
        except:
            raise
        finally:
            s.release()
    
    if __name__=='__main__':
        connections=Semaphore(5)
        workers=[]
        for i in range(20):
            p=Process(target=get_connections,args=(connections,),name='worker:'+str(i+1))
            workers.append(p)
        for p in workers:
            p.start()
        for p in workers:
            p.join()
        print("all workers exit")
    

    信号量设置为5,表示5个子线程可以连接,当有子线程退出之后,新的线程才可以继续连接。
    比如刚开始3、1、4、2、11线程接入,当3完成工作之后释放,新的线程7进行连接。以此类推。

    worker:3 acqiure a connection
    worker:1 acqiure a connection
    worker:4 acqiure a connection
    worker:2 acqiure a connection
    worker:11 acqiure a connection
    worker:3 finishes its job and return the connection
    worker:7 acqiure a connection
    worker:1 finishes its job and return the connection
    worker:12 acqiure a connection
    worker:2 finishes its job and return the connection
    worker:17 acqiure a connection
    worker:11 finishes its job and return the connection
    worker:8 acqiure a connection
    worker:12 finishes its job and return the connection
    worker:5 acqiure a connection
    worker:4 finishes its job and return the connection
    worker:10 acqiure a connection
    worker:7 finishes its job and return the connection
    worker:14 acqiure a connection
    worker:5 finishes its job and return the connection
    worker:15 acqiure a connection
    worker:10 finishes its job and return the connection
    worker:18 acqiure a connection
    worker:17 finishes its job and return the connection
    worker:9 acqiure a connection
    worker:8 finishes its job and return the connection
    worker:13 acqiure a connection
    worker:15 finishes its job and return the connection
    worker:16 acqiure a connection
    worker:9 finishes its job and return the connection
    worker:6 acqiure a connection
    worker:13 finishes its job and return the connection
    worker:19 acqiure a connection
    worker:14 finishes its job and return the connection
    worker:20 acqiure a connection
    worker:16 finishes its job and return the connection
    worker:18 finishes its job and return the connection
    worker:6 finishes its job and return the connection
    worker:19 finishes its job and return the connection
    worker:20 finishes its job and return the connection
    all workers exit
    

    如果把start函数和join函数写在一起,那么执行情况实际上变为串行

    worker:1 acqiure a connection
    worker:1 finishes its job and return the connection
    worker:2 acqiure a connection
    worker:2 finishes its job and return the connection
    worker:3 acqiure a connection
    worker:3 finishes its job and return the connection
    worker:4 acqiure a connection
    worker:4 finishes its job and return the connection
    worker:5 acqiure a connection
    worker:5 finishes its job and return the connection
    worker:6 acqiure a connection
    worker:6 finishes its job and return the connection
    worker:7 acqiure a connection
    worker:7 finishes its job and return the connection
    worker:8 acqiure a connection
    worker:8 finishes its job and return the connection
    worker:9 acqiure a connection
    worker:9 finishes its job and return the connection
    worker:10 acqiure a connection
    worker:10 finishes its job and return the connection
    worker:11 acqiure a connection
    worker:11 finishes its job and return the connection
    worker:12 acqiure a connection
    worker:12 finishes its job and return the connection
    worker:13 acqiure a connection
    worker:13 finishes its job and return the connection
    worker:14 acqiure a connection
    worker:14 finishes its job and return the connection
    worker:15 acqiure a connection
    worker:15 finishes its job and return the connection
    worker:16 acqiure a connection
    worker:16 finishes its job and return the connection
    worker:17 acqiure a connection
    worker:17 finishes its job and return the connection
    worker:18 acqiure a connection
    worker:18 finishes its job and return the connection
    worker:19 acqiure a connection
    worker:19 finishes its job and return the connection
    worker:20 acqiure a connection
    worker:20 finishes its job and return the connection
    all workers exit
    
    事件
    from multiprocessing import Process
    from multiprocessing import Event
    import time
    import random
    
    def car(event,name):
        while True:
            if event.is_set():
                time.sleep(random.random())
                print("car {} passes...".format(name))
                break
            else:
                print("car {} waits...".format(name))
                event.wait()#阻塞直至事件状态发生变化
    
    def light(event):
        while True:
            if event.is_set():
                event.clear()
                print("红灯")
                time.sleep(random.random())
            else:
                event.set()
                print("绿灯")
                time.sleep(random.random())
    
    if __name__=='__main__':
        event=Event()
        event.clear()
        l=Process(target=light,args=(event,))
        l.daemon=True
        l.start()
        cars=[]
        for i in range(10):
            c=Process(target=car,args=(event,'c_'+str(i+1)))
            cars.append(c)
        for c in cars:
            c.start()
        for c in cars:
            c.join()
        print("all cars passed...")
    

    每次运行得到的结果不尽相同。

    car c_1 waits...
    car c_2 waits...
    绿灯
    car c_3 passes...
    car c_1 passes...
    car c_4 passes...
    car c_7 passes...
    car c_8 passes...
    car c_6 passes...
    car c_10 passes...
    car c_5 passes...
    红灯
    car c_9 passes...
    绿灯
    car c_2 passes...
    all cars passed...
    
    生产者-消费者模型
    from multiprocessing import Process, Queue
    import time
    import random
    import os
    
    def produce(q):
        time.sleep(random.random())
        q.put('car_by_{}'.format(os.getpid()))
        print("{} produces a car...".format(os.getpid()))
    
    def buy(q):
        car=q.get()
        if car is None:
            print("no car. {} ends.".format(os.getpid()))
            return
        else:
            time.sleep(random.random())
            print("{} buy the car {}".format(os.getpid(),car))
    
    if __name__=='__main__':
        q=Queue()
        procucers=[]
        consumers=[]
        for i in range(0,10):
            p=Process(target=produce,args=(q,))
            procucers.append(p)
        for i in range(0,50):
            c=Process(target=buy,args=(q,))
            consumers.append(c)
        for p in procucers:
            p.start()
        for p in procucers:
            p.join()
        for c in consumers:
            c.start()
        for c in consumers:
            q.put(None)#主进程发信号结束,但要给每一个consumer准备
        print('main')
    
    14764 produces a car...
    17672 produces a car...
    8124 produces a car...
    18736 produces a car...
    17312 produces a car...
    1460 produces a car...
    6200 produces a car... 
    18568 produces a car...
    14964 produces a car...
    18560 produces a car...
    main
    7740 buy the car car_by_14764
    15340 buy the car car_by_14964
    no car. 16420 ends.
    no car. 8736 ends.
    no car. 11540 ends.
    15176 buy the car car_by_17672
    13416 buy the car car_by_8124
    no car. 1208 ends.
    no car. 14524 ends.
    no car. 8036 ends.
    no car. 12192 ends.
    no car. 10984 ends.
    no car. 13016 ends.
    7540 buy the car car_by_17312
    10980 buy the car car_by_1460
    no car. 12132 ends.
    no car. 17976 ends.
    no car. 12704 ends.
    no car. 18572 ends.
    20156 buy the car car_by_18568
    no car. 17216 ends.
    no car. 2492 ends.
    no car. 17628 ends.
    no car. 8396 ends.
    no car. 13528 ends.
    no car. 12440 ends.
    16108 buy the car car_by_18736
    no car. 16200 ends.
    14096 buy the car car_by_6200
    no car. 9296 ends.
    no car. 2464 ends.
    no car. 19956 ends.
    13672 buy the car car_by_18560
    no car. 11244 ends.
    no car. 2832 ends.
    no car. 5484 ends.
    no car. 11108 ends.
    no car. 1648 ends.
    no car. 14492 ends.
    no car. 11720 ends.
    no car. 18240 ends.
    no car. 18936 ends.
    no car. 7676 ends.
    no car. 5408 ends.
    no car. 12232 ends.
    no car. 18080 ends.
    no car. 14404 ends.
    no car. 15296 ends.
    no car. 19800 ends.
    no car. 4744 ends.
    

    相关文章

      网友评论

        本文标题:python多进程与多线程、互斥锁与信号量

        本文链接:https://www.haomeiwen.com/subject/zildxrtx.html