美文网首页python并发Python 核心Python/Go开发
[python] 线程间同步之条件变量Condition

[python] 线程间同步之条件变量Condition

作者: StormZhu | 来源:发表于2018-05-04 02:01 被阅读0次

    为什么需要条件变量

    有了前面提到的互斥锁,为什么还需要条件变量呢,当然是由于有些复杂问题互斥锁搞不定了。Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquirerelease方法外,还提供了waitnotify方法。

    先看一个互斥锁解决不了的场景,假设两个智能聊天机器人(小米的小爱和天猫的天猫精灵)对话,

    天猫精灵:小爱同学

    小爱:在

    天猫精灵:我们来对古诗吧

    小爱:好啊

    天猫精灵:我住长江头

    小爱:不聊了,再见

    假设小爱和天猫精灵分别是两个线程,先使用互斥锁来实现一下:

    import threading
    
    class XiaoAi(threading.Thread):
        def __init__(self, lock):
            super().__init__(name="小爱")
            self.lock = lock
    
        def run(self):
            self.lock.acquire()
            print("{} : 在".format(self.name))
            self.lock.release()
    
            self.lock.acquire()
            print("{} : 好啊".format(self.name))
            self.lock.release()
    
    class TianMao(threading.Thread):
        def __init__(self, lock):
            super().__init__(name="天猫精灵")
            self.lock = lock
    
        def run(self):
            self.lock.acquire()
            print("{} : 小爱同学".format(self.name))
            self.lock.release()
    
            self.lock.acquire()
            print("{} : 我们来对古诗吧".format(self.name))
            self.lock.release()
    
    if __name__ == "__main__":
        lock = threading.Lock()
        xiaoai = XiaoAi(lock)
        tianmao = TianMao(lock)
    
        tianmao.start()
        xiaoai.start()
        
    # 运行结果如下:
    # 天猫精灵 : 小爱同学
    # 天猫精灵 : 我们来对古诗吧
    # 小爱 : 在
    # 小爱 : 好啊
    

    可以看到,输出结果并不是预期的对话顺序,这是因为天猫精灵的线程说完“小爱同学”之后,cpu的控制权还没有交出去,继续获取了互斥锁,又执行了“我们来对古诗吧”,所以不能得到预期结果。

    先自己想一下解决办法,理论上应该A线程在等待中,B线程在干活,干活完毕之后通知A线程活干完了,B线程进入等待,而A线程得到了通知之后,不再继续等待,开始干活,看完之后通知B线程,如此循环,直到结束。

    比较粗糙的想法:假设有一个全局变量active_user,为0表示该A线程执行,1表示B线程执行,对于A线程,先实现wait方法:就是while循环判断是否active_user == 0(必须保证这个变量在两个线程中使用的是同一个),notify方法:将active_user赋值为1。对于B线程,实现方式相反。代码如下:

    import threading
    
    class XiaoAi(threading.Thread):
        def __init__(self, lock, active_user):
            super().__init__(name="小爱")
            self.lock = lock
            self.active_user = active_user
    
        def wait(self):
            while(1):
                self.lock.acquire()
                user = self.active_user[0]
                self.lock.release()
                if user == 1:
                    break
    
        def notify(self):
            self.lock.acquire()
            self.active_user[0] = 0
            self.lock.release()
    
        def run(self):
            self.wait()
            print("{} : 在".format(self.name))
            self.notify()
    
            self.wait()
            print("{} : 好啊".format(self.name))
            self.notify()
    
    class TianMao(threading.Thread):
        def __init__(self, lock, active_user):
            super().__init__(name="天猫精灵")
            self.lock = lock
            self.active_user = active_user
    
        def wait(self):
            while(1):
                self.lock.acquire()
                user = self.active_user[0]
                self.lock.release()
                if user == 0:
                    break
    
        def notify(self):
            self.lock.acquire()
            self.active_user[0] = 1
            self.lock.release()
    
    
        def run(self):
            self.wait()
            print("{} : 小爱同学".format(self.name))
            self.notify()
    
            self.wait()
            print("{} : 我们来对古诗吧".format(self.name))
            self.notify()
    
    if __name__ == "__main__":
        # 0表示天猫执行, 1表示小爱
        # 为了保证两个线程修改active_user之后,互相是可见的,所以传了一个List,而不是整数
        active_user = [0] 
        lock = threading.Lock()
        xiaoai = XiaoAi(lock, active_user)
        tianmao = TianMao(lock, active_user)
    
        tianmao.start()
        xiaoai.start()
    # 运行结果如下:可得到预期结果
    # 天猫精灵 : 小爱同学
    # 天猫精灵 : 我们来对古诗吧
    # 小爱 : 在
    # 小爱 : 好啊
    
    

    由上面的例子可知,由互斥锁是可以实现互相通知的需求的。但是上面的代码效率不高,一直在while循环中判断,还要自己维护一个全局变量,很麻烦,在复杂场景下不能胜任。于是python就给我们封装好了Condition类。

    条件变量Condition

    构造方法:

    import threading
    # 可传入一个互斥锁或者可重入锁
    cond = threading.Condition()
    

    实例方法:

    acquire([timeout])/release(): 调用关联的锁的相应方法。 
    wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。
        使用前线程必须已获得锁定,否则将抛出异常。 
    notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用
        acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会
        释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
    notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池
        尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
    

    主要使用方法和前面自己实现的差不多,主要调用waitnotify方法,将上面的方法改写为使用条件变量:

    import threading
    class XiaoAi(threading.Thread):
        def __init__(self, cond):
            super().__init__(name="小爱")
            self.cond = cond
    
        def run(self):
            self.cond.acquire()
    
            self.cond.wait()
            print("{} : 在".format(self.name))
            self.cond.notify()
    
            self.cond.wait()
            print("{} : 好啊".format(self.name))
            self.cond.notify()
    
            self.cond.wait()
            print("{} : 不聊了,再见".format(self.name))
            self.cond.notify()
    
            self.cond.release()
    
    
    class TianMao(threading.Thread):
        def __init__(self, cond):
            super().__init__(name="天猫精灵")
            self.cond = cond
    
        def run(self):
            self.cond.acquire()
    
            print("{} : 小爱同学".format(self.name))
            self.cond.notify()
            self.cond.wait()
    
            print("{} : 我们来对古诗吧".format(self.name))
            self.cond.notify()
            self.cond.wait()
    
            print("{} : 我住长江头".format(self.name))
            self.cond.notify()
            self.cond.wait()
    
            self.cond.release()
    
    if __name__ == "__main__":
        cond = threading.Condition()
        xiaoai = XiaoAi(cond)
        tianmao = TianMao(cond)
        
        tianmao.start()
        xiaoai.start()
    
    # 执行结果
    # 天猫精灵 : 小爱同学
    

    运行之后会发现天猫精灵说出了“小爱同学”之后就没有了响应,这就是在使用条件变量的时候需要注意的点。仔细观察主函数中的线程启动顺序,tianmao先启动了,假设tianmao已经启动完成,并打印了“小爱同学”,执行notify之后,xiaoai才刚刚启动,成功执行完self.cond.acquire()之后,开始执行wait语句,但此时会陷入死循环!原因是 wait()只能被notify()唤醒,而notify()已经被另一个线程执行过了,注意:只能是一个线程执行过了wait(),在被阻塞过程中,另一个线程执行了notify()才可以。不然就像上面一下陷入死循环。因此,需要将上面的main方法改写:

    if __name__ == "__main__":
        cond = threading.Condition()
        xiaoai = XiaoAi(cond)
        tianmao = TianMao(cond)
        
        # 启动顺序很重要
        xiaoai.start()
        tianmao.start()
        
    # 执行结果
    # 天猫精灵 : 小爱同学
    # 小爱 : 在
    # 天猫精灵 : 我们来对古诗吧
    # 小爱 : 好啊
    # 天猫精灵 : 我住长江头
    # 小爱 : 不聊了,再见
    

    可以看到,改完启动顺序运行结果对了,其实这样并不能完全保证xiaoai会先启动,如果xiaoairun方法中有个1s延时,就算先执行xiaoai.start()tianmao也会先执行notify(),具体这种情况下应该怎么办,暂时还不清楚。。。

    源码分析

    大致实现思路描述:Codition有两层锁,一把底层锁会在进入wait方法的时候释放,离开wait方法的时候再次获取,上层锁会在每次调用wait时分配一个新的锁,并放入condition的等待队列中,而notify负责释放这个锁。可能理解起来不是很直观,直接看源码:

    init方法

    init方法.png

    先看源码Condition类的说明,这是一个实现了条件变量的类,允许一个或多个线程等待其他线程的通知。在__init__方法中,有一个参数lock,默认为None。有两种用法:

    1. 如果lock是非None,也就是说用户想自己设置参数,必须传递LockRLock对象。
    2. 如果lockNone__init__方法中默认使用可重入锁RLock

    这个lock作为底层维护的锁 underlying lock,条件变量实现的关键。

    __init__函数中另一个比较重要的步骤是,建立了一个双端队列,存储所有在等待中的锁,self._waiters = _deque()

    wait方法

    wait方法.png

    这里有一个疑惑,第二次的waiter.acquire()没有找到对应的release方法?虽然感觉不会影响结果,一种可能是在从队列中移除这个锁的时候尝试了释放这个锁。

    notify方法

    notify方法.png

    简单总结,A线程阻塞在wait方法时,只有B线程执行了notifywait的时候(也有可能B线程执行了notify,而C线程执行了wait),A线程的wait方法才能执行完毕,而此时B线程会阻塞在wait方法中。

    总结

    • 条件变量提供了对复杂线程同步问题的支持。
    • 条件变量也是使用互斥锁实现的,主要是两层锁结构。

    参考

    1. python 线程之 Condition
    2. Python3高级编程和异步IO并发编程

    相关文章

      网友评论

        本文标题:[python] 线程间同步之条件变量Condition

        本文链接:https://www.haomeiwen.com/subject/jeyrrftx.html