背景
最近项目中遇到一个需求,有多个机器学习算法都需要独占一块gpu,如果多个算法同时共用一个gpu可能显存不够用导致程序崩溃。但发货出去的服务器往往只有1~2块gpu,我需要实现一个调度中心,某算法在计算前,先向调度中心申请gpu资源,计算完成后再向调度中心释放资源,使资源可供其它算法使用。
为了保证多进程信息的一致性,这个调度系统必须加锁,进程间通讯我使用的是rabbitmq,使用python中的threading标准库及Queue对象,完成了一个调度系统雏形(玩具版)。在使用Queue对象时,发现它通过threading库中的接口实现了线程安全。之前有看过threading库,但一直没来得及进行总结。后面我将陆续对threading库中的接口源码进行浅析,来印证这次学习。由于本人才疏学浅,有误或不足之处,希望大家指点。
示例
先贴一个Condition使用示例,简单的生产者消费者模型。
# coding:utf8
import threading
import logging
FORMAT = '%(asctime)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
class Consumer:
def __init__(self, condition):
self.condition = condition
self.event = threading.Event()
def consume(self, interval):
count = 0
with self.condition:
while not self.event.wait(interval):
self.condition.wait()
count += 1
logging.info('I consume a bread, please produce.count: {}'.format(count))
self.condition.notify()
class Productor:
def __init__(self, condition):
self.condition = condition
self.event = threading.Event()
def produce(self, interval):
count = 0
with self.condition:
while not self.event.wait(interval):
count += 1
logging.info('I produce a bread, please consume.count: {}'.format(count))
self.event.wait(1)
self.condition.notify()
self.condition.wait()
if __name__ == '__main__':
condition = threading.Condition()
productor = Productor(condition)
consumer = Consumer(condition)
t1 = threading.Thread(target=productor.produce, args=(1,), name='productor')
t2 = threading.Thread(target=consumer.consume, args=(2,), name='consumer')
t2.start()
t1.start()
输出结果
2019-07-07 15:58:09,981 productor I produce a bread, please consume. count: 1
2019-07-07 15:58:10,987 consumer I consume a bread, please produce. count: 1
2019-07-07 15:58:13,996 productor I produce a bread, please consume. count: 2
2019-07-07 15:58:15,001 consumer I consume a bread, please produce. count: 2
2019-07-07 15:58:18,008 productor I produce a bread, please consume. count: 3
2019-07-07 15:58:19,010 consumer I consume a bread, please produce. count: 3
源码分析
上面代码不用多说,比较简单。主要分析下Condition源码。
- 首先看下初始化方法
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
-
Condition实例化时调用者可以给它传一个lock参数,这个参数如果不传则Condition对象的_lock属性被赋值为一个RLock可重入锁,这个参数的作用我后面分析Queue对象的时再结合使用去分析。现在,我把_lock属性简称为Condition的全局锁(一把大锁,会应用于多个线程中)。
-
然后会给实例赋值一些属性,acquire、release、_release_save、_acquire_restore、_is_owned被赋值为那把全局锁RLock对象上的方法,通过这些方法,可以操作全局锁。
-
最后给Condition实例赋值一个_waiters属性,为一个双端队列,用于存放以后针对不同线程的锁(小锁)。
-
Condition对象支持上下文管理,如下方法分别为获取、释放全局锁(大锁),
判断当前线程是否拥有全局锁。
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
def _release_save(self):
self._lock.release() # No state to save
def _acquire_restore(self, x):
self._lock.acquire() # Ignore saved state
def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if _lock doesn't have _is_owned().
if self._lock.acquire(0):
self._lock.release()
return False
else:
return True
- 下面我们来分析两个接口方法wait、notify
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
- 调用wait方法时,可传入timeout参数,即超时时间。
它首先判断当前线程是否拥有全局锁,如果没有则抛异常RuntimeError("cannot wait on un-acquired lock") - 然后它会生成一把锁(不可重入锁,这把锁是独立针对于不同线程的,每个线程都会有一个这样的锁,用于分别阻塞不同的线程,这里简称为线程锁)赋值给waiter 。获取一下这把线程锁,然后将线程锁追加到self._waiters 等候队列中。
之后释放全局锁self._release_save(),生成gotit flag。 - 尝试再次获取waiter 线程锁,线程会阻塞在这里(因为线程锁之前已经获取过一遍了),直到成功获取线程锁,gotit被赋值为True;或者设置了timeout超时时间,阻塞直到获取线程锁或超时,将获取状态赋值给gotit, 如果timeout < 0,则非阻塞获取线程锁。
返回获取状态gotit - finally语句体中获取全局锁,如果未获得线程锁,则在双端队列中移除线程锁。
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
- 分析下notify方法,参数n=1, 默认唤醒一个等待的线程。
如果当前线程没有获得全局锁,则抛异常RuntimeError("cannot wait on un-acquired lock") - 对存放线程锁的双端队列进行切片,获取要被释放的线程锁,
释放这些线程锁,并将这些线程锁从队列中remove掉。
这样之前调用wait方法,被线程锁所阻塞的线程得以继续执行。 - Condition其它方法如notify_all逻辑是一样的,不再赘述。
启动顺序
- 在使用Condition时会有线程启动顺序的问题,如果调换示例中t1(生产者)、t2(生产者)两线程的启动顺序,先启动生产线程后启动消费线程,输出如下:
2019-07-07 23:01:48,137 productor I produce a bread, please consume. count: 1
- 发现只有一条生产线程的log信息,消费者并没有去消费。
看一下生产者的produce方法,分析下原因
def produce(self, interval):
count = 0
with self.condition:
while not self.event.wait(interval):
count += 1
logging.info('I produce a bread, please consume.count: {}'.format(count))
self.event.wait(1)
self.condition.notify()
self.condition.wait()
- 在生产线程中Condition对象先调用了notify方法,去释放阻塞等待线程的线程锁,然后调用wait方法,释放全局锁,并将生产线程用一个线程锁阻塞住。
def consume(self, interval):
count = 0
with self.condition:
while not self.event.wait(interval):
self.condition.wait()
count += 1
logging.info('I consume a bread, please produce. count: {}'.format(count))
self.condition.notify()
-
在生产线程释放完全局锁后,Condition对象又在消费者线程中获得全局锁,然后消费者线程又被wait方法阻塞住了。
-
原因就在于生产者先调用了notify方法,但此时消费线程还未获得Condition的全局锁,是被阻塞在全局锁,也没生成线程锁。这样生产线程中调用notify等于没释放任何线程锁,随后释放全局锁,将自己用线程锁阻塞。消费线程中获得全局锁后才调用wait方法,释放全局锁,将自己用线程锁阻塞。
此时,生产线程和消费线程都是阻塞在各自线程的线程锁处,没办法再调用notify去释放线程锁。因为调用顺序错误,导致死锁。 -
正确的启动顺序是:先启动消费线程后启动生产线程。让消费线程先释放全局锁并阻塞在线程锁,而后生产线程获得全局锁,释放掉消费线程的线程锁。
这之后生产线程释放全局锁,阻塞在线程锁,再由消费线程去释放生产线程的线程锁。之后这个过程不断循环。
- 之前我第一次使用Condition时还没来得及源码,也是被这个启动顺序困扰了一下。全篇写的比较啰嗦,希望大家不要嫌弃。
本文版权归作者 莫斯克内斯(博客地址:https://www.jianshu.com/p/512f045b2259
)所有,欢迎转载和商用,请在文章页面明显位置给出原文链接并保留此段声明,否则保留追究法律责任的权利,其他事项,可留言咨询。
网友评论