Python 线程锁的实现
Lock 的实现
锁只有两种状态,锁定或者未锁定
Lock = _allocate_lock
_allocate_lock = thread.allocate_lock
thread.allocate_lock
是用C代码实现的,代码位置 Python/thread_pthread.h
假设我们的系统支持 POSIX
semaphores
首先看下 sem_init
的原型
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared
决定了这个信号量是在进程中共享还是在线程中共享。
-
pshared
为 非零值,那么不同进程中都可以共享 -
pshared
为 零值,那么在当前进程的线程中共享。
https://svn.python.org/projects/python/trunk/Python/thread_pthread.h
PyThread_type_lock
PyThread_allocate_lock(void)
{
...
/* 申请内存 */
lock = (sem_t *)malloc(sizeof(sem_t));
if (lock) {
/*
初始化
value 为1,表明这个锁是 unlocked,被该进程的所有线程共享
*/
status = sem_init(lock,0,1);
CHECK_STATUS("sem_init");
....
}
...
}
Acquire
// waitflag 默认为 true
int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
int success;
sem_t *thelock = (sem_t *)lock;
int status, error = 0;
dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));
do {
if (waitflag)
//默认执行到这里
status = fix_status(sem_wait(thelock));
else
status = fix_status(sem_trywait(thelock));
} while (status == EINTR); /* Retry if interrupted by a signal */
if (waitflag) {
CHECK_STATUS("sem_wait");
} else if (status != EAGAIN) {
CHECK_STATUS("sem_trywait");
}
success = (status == 0) ? 1 : 0;
dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
return success;
}
Release
void
PyThread_release_lock(PyThread_type_lock lock)
{
sem_t *thelock = (sem_t *)lock;
int status, error = 0;
dprintf(("PyThread_release_lock(%p) called\n", lock));
// sem_post 是关键,释放锁
status = sem_post(thelock);
CHECK_STATUS("sem_post");
}
RLock 的实现
RLock表示的是 reentrant lock,如果该锁已经被获取,那么acquire
可以被同一个线程(进程)多次无阻塞调用。但是 release
必须被匹配的使用。
下面可以看到 RLock 不过是一个浅包装
def RLock(*args, **kwargs):
return _RLock(*args, **kwargs)
RLock 内部保存了一个普通的锁(thread.allocate_lock 生成),同时保存了 这个锁的 owner,
class _RLock():
def __init__(self):
# 内部使用的 一个锁
self.__block = _allocate_lock()
# __owner 用来保存 acquire 成功时的线程 id
self.__owner = None
# acquire被重复调用的次数
self.__count = 0
python3 的实现
python3 会判断系统是否支持 reentrant lock,如果支持则用系统的,否则用 python 代码实现一个。
下面我们将看到,如何只用一个 Lock来实现其他的同步机制, Condition, Event, Semaphore等
Condition 的实现
多个线程可以用 condition 来等待同一个事件的发生,当一个事件发生后,所有等待的线程都可以得到通知。
一个 Condition 总是和一个锁关联在一起的。可以传递一个锁,也可以由 构造函数自己创建一个。
先看下如何使用
import logging
import random
import threading
import time
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-9s) %(message)s',)
queue = []
def consumer(cv, q):
logging.debug('Consumer thread started ...')
while True:
with cv:
while not q:
logging.debug("Nothing in queue, consumer is waiting")
cv.wait()
num = q.pop(0)
logging.debug("Consumed %s", num)
time.sleep(random.randint(1,3))
def producer(cv, q):
logging.debug('Producer thread started ...')
while True:
with cv:
nums = range(5)
num = random.choice(nums)
q.append(num)
logging.debug("Produced %s", num)
cv.notify_all()
if __name__ == '__main__':
condition = threading.Condition()
for i in range(10):
threading.Thread(name='consumer%s' % i, target=consumer, args=(condition, queue)).start()
pd = threading.Thread(name='producer', target=producer, args=(condition, queue))
pd.start()
下面看如何实现
class _Condition:
def __init__(self, lock=None, verbose=None):
# 必须关联一个 Lock,如果没有的话,则自己创建一个 RLock
if lock is None:
lock = RLock()
self.__lock = lock
# 可以在 Condition上调用 acquire() and release() 方法,实际是调用的是内部锁的方法
self.acquire = lock.acquire
self.release = lock.release
# 如果锁定义了 _release_save _acquire_restore _is_owned 方法,那么使用之,否则用自己定义的
#......
# 这个很重要,保存了等待在这个Condition上的信息
self.__waiters = []
下面看 wait方法,为了篇幅,省略了部分代码
def wait(self, timeout=None):
# 必须先成功调用acquire方法,才能调用wait
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# 生成一个锁,并调用 acquire,使得它处于 locked 状态
# 这个锁代表一个waiter
waiter = _allocate_lock()
waiter.acquire()
# 保存起来
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
# 再次调用 acquire 方法,等待锁被释放
waiter.acquire()
if __debug__:
self._note("%s.wait(): got it", self)
else:
# 。。。。。。
finally:
# 必须恢复锁原来的状态,这个方法很重要
self._acquire_restore(saved_state)
再看下 notify方法
def notify(self, n=1):
# 同样,必须得调用 acquire成功,才可以调用本方法
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
__waiters = self.__waiters
waiters = __waiters[:n]
if not waiters:
if __debug__:
self._note("%s.notify(): no waiters", self)
return
self._note("%s.notify(): notifying %d waiter%s", self, n,
n!=1 and "s" or "")
for waiter in waiters:
# 调用 锁上的 release 方法,使得等待者可以继续
waiter.release()
try:
__waiters.remove(waiter)
except ValueError:
pass
寻工作
本人正在找工作。地点深圳。请联系我微信 sunfriend
网友评论