美文网首页随笔-生活工作点滴
threading库源码浅析系列之Condition

threading库源码浅析系列之Condition

作者: 莫斯克内斯 | 来源:发表于2019-07-07 23:44 被阅读0次

背景

最近项目中遇到一个需求,有多个机器学习算法都需要独占一块gpu,如果多个算法同时共用一个gpu可能显存不够用导致程序崩溃。但发货出去的服务器往往只有1~2块gpu,我需要实现一个调度中心,某算法在计算前,先向调度中心申请gpu资源,计算完成后再向调度中心释放资源,使资源可供其它算法使用。

为了保证多进程信息的一致性,这个调度系统必须加锁,进程间通讯我使用的是rabbitmq,使用python中的threading标准库及Queue对象,完成了一个调度系统雏形(玩具版)。在使用Queue对象时,发现它通过threading库中的接口实现了线程安全。之前有看过threading库,但一直没来得及进行总结。后面我将陆续对threading库中的接口源码进行浅析,来印证这次学习。由于本人才疏学浅,有误或不足之处,希望大家指点。

示例

先贴一个Condition使用示例,简单的生产者消费者模型。

# coding:utf8
import threading
import logging
FORMAT = '%(asctime)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)


class Consumer:

    def __init__(self, condition):
        self.condition = condition
        self.event = threading.Event()

    def consume(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                self.condition.wait()
                count += 1
                logging.info('I consume a bread, please produce.count: {}'.format(count))
                self.condition.notify()


class Productor:

    def __init__(self, condition):
        self.condition = condition
        self.event = threading.Event()

    def produce(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                count += 1
                logging.info('I produce a bread, please consume.count: {}'.format(count))
                self.event.wait(1)
                self.condition.notify()
                self.condition.wait()


if __name__ == '__main__':
    condition = threading.Condition()
    productor = Productor(condition)
    consumer = Consumer(condition)
    t1 = threading.Thread(target=productor.produce, args=(1,), name='productor')
    t2 = threading.Thread(target=consumer.consume, args=(2,), name='consumer')
    t2.start()
    t1.start()

输出结果

2019-07-07 15:58:09,981 productor I produce a bread, please consume. count: 1
2019-07-07 15:58:10,987 consumer I consume a bread, please produce. count: 1
2019-07-07 15:58:13,996 productor I produce a bread, please consume. count: 2
2019-07-07 15:58:15,001 consumer I consume a bread, please produce. count: 2
2019-07-07 15:58:18,008 productor I produce a bread, please consume. count: 3
2019-07-07 15:58:19,010 consumer I consume a bread, please produce. count: 3

源码分析

上面代码不用多说,比较简单。主要分析下Condition源码。

  1. 首先看下初始化方法
 def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()
  • Condition实例化时调用者可以给它传一个lock参数,这个参数如果不传则Condition对象的_lock属性被赋值为一个RLock可重入锁,这个参数的作用我后面分析Queue对象的时再结合使用去分析。现在,我把_lock属性简称为Condition的全局锁(一把大锁,会应用于多个线程中)。

  • 然后会给实例赋值一些属性,acquire、release、_release_save、_acquire_restore、_is_owned被赋值为那把全局锁RLock对象上的方法,通过这些方法,可以操作全局锁。

  • 最后给Condition实例赋值一个_waiters属性,为一个双端队列,用于存放以后针对不同线程的锁(小锁)。

  1. Condition对象支持上下文管理,如下方法分别为获取、释放全局锁(大锁),
    判断当前线程是否拥有全局锁。
    def __enter__(self):
        return self._lock.__enter__()

    def __exit__(self, *args):
        return self._lock.__exit__(*args)

    def _release_save(self):
        self._lock.release()           # No state to save

    def _acquire_restore(self, x):
        self._lock.acquire()           # Ignore saved state

    def _is_owned(self):
        # Return True if lock is owned by current_thread.
        # This method is called only if _lock doesn't have _is_owned().
        if self._lock.acquire(0):
            self._lock.release()
            return False
        else:
            return True
  1. 下面我们来分析两个接口方法wait、notify
    def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass
  • 调用wait方法时,可传入timeout参数,即超时时间。
    它首先判断当前线程是否拥有全局锁,如果没有则抛异常RuntimeError("cannot wait on un-acquired lock")
  • 然后它会生成一把锁(不可重入锁,这把锁是独立针对于不同线程的,每个线程都会有一个这样的锁,用于分别阻塞不同的线程,这里简称为线程锁)赋值给waiter 。获取一下这把线程锁,然后将线程锁追加到self._waiters 等候队列中。
    之后释放全局锁self._release_save(),生成gotit flag。
  • 尝试再次获取waiter 线程锁,线程会阻塞在这里(因为线程锁之前已经获取过一遍了),直到成功获取线程锁,gotit被赋值为True;或者设置了timeout超时时间,阻塞直到获取线程锁或超时,将获取状态赋值给gotit, 如果timeout < 0,则非阻塞获取线程锁。
    返回获取状态gotit
  • finally语句体中获取全局锁,如果未获得线程锁,则在双端队列中移除线程锁。
    def notify(self, n=1):
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass
  • 分析下notify方法,参数n=1, 默认唤醒一个等待的线程。
    如果当前线程没有获得全局锁,则抛异常RuntimeError("cannot wait on un-acquired lock")
  • 对存放线程锁的双端队列进行切片,获取要被释放的线程锁,
    释放这些线程锁,并将这些线程锁从队列中remove掉。
    这样之前调用wait方法,被线程锁所阻塞的线程得以继续执行。
  • Condition其它方法如notify_all逻辑是一样的,不再赘述。

启动顺序

  1. 在使用Condition时会有线程启动顺序的问题,如果调换示例中t1(生产者)、t2(生产者)两线程的启动顺序,先启动生产线程后启动消费线程,输出如下:
2019-07-07 23:01:48,137 productor I produce a bread, please consume. count: 1
  1. 发现只有一条生产线程的log信息,消费者并没有去消费。
    看一下生产者的produce方法,分析下原因
def produce(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                count += 1
                logging.info('I produce a bread, please consume.count: {}'.format(count))
                self.event.wait(1)
                self.condition.notify()
                self.condition.wait()
  • 在生产线程中Condition对象先调用了notify方法,去释放阻塞等待线程的线程锁,然后调用wait方法,释放全局锁,并将生产线程用一个线程锁阻塞住。
    def consume(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                self.condition.wait()
                count += 1
                logging.info('I consume a bread, please produce. count: {}'.format(count))
                self.condition.notify()
  • 在生产线程释放完全局锁后,Condition对象又在消费者线程中获得全局锁,然后消费者线程又被wait方法阻塞住了。

  • 原因就在于生产者先调用了notify方法,但此时消费线程还未获得Condition的全局锁,是被阻塞在全局锁,也没生成线程锁。这样生产线程中调用notify等于没释放任何线程锁,随后释放全局锁,将自己用线程锁阻塞。消费线程中获得全局锁后才调用wait方法,释放全局锁,将自己用线程锁阻塞。
    此时,生产线程和消费线程都是阻塞在各自线程的线程锁处,没办法再调用notify去释放线程锁。因为调用顺序错误,导致死锁。

  • 正确的启动顺序是:先启动消费线程后启动生产线程。让消费线程先释放全局锁并阻塞在线程锁,而后生产线程获得全局锁,释放掉消费线程的线程锁。
    这之后生产线程释放全局锁,阻塞在线程锁,再由消费线程去释放生产线程的线程锁。之后这个过程不断循环。

  1. 之前我第一次使用Condition时还没来得及源码,也是被这个启动顺序困扰了一下。全篇写的比较啰嗦,希望大家不要嫌弃。

本文版权归作者 莫斯克内斯(博客地址:https://www.jianshu.com/p/512f045b2259
)所有,欢迎转载和商用,请在文章页面明显位置给出原文链接并保留此段声明,否则保留追究法律责任的权利,其他事项,可留言咨询。

相关文章

网友评论

    本文标题:threading库源码浅析系列之Condition

    本文链接:https://www.haomeiwen.com/subject/xyojhctx.html