美文网首页python自学
Python线程锁的实现

Python线程锁的实现

作者: zhaoxg_cat | 来源:发表于2018-09-07 20:01 被阅读64次

    Python 线程锁的实现

    Lock 的实现

    锁只有两种状态,锁定或者未锁定

    Lock = _allocate_lock
    
    _allocate_lock = thread.allocate_lock
    

    thread.allocate_lock 是用C代码实现的,代码位置 Python/thread_pthread.h

    假设我们的系统支持 POSIX semaphores

    首先看下 sem_init 的原型

    #include <semaphore.h>
    int sem_init(sem_t *sem, int pshared, unsigned int value);
    

    pshared决定了这个信号量是在进程中共享还是在线程中共享。

    • pshared 为 非零值,那么不同进程中都可以共享
    • pshared 为 零值,那么在当前进程的线程中共享。

    https://svn.python.org/projects/python/trunk/Python/thread_pthread.h

    PyThread_type_lock
    PyThread_allocate_lock(void)
    {
        ...
        /* 申请内存 */
        lock = (sem_t *)malloc(sizeof(sem_t));
    
        if (lock) {
            /*
            初始化
            value 为1,表明这个锁是 unlocked,被该进程的所有线程共享
            */
            status = sem_init(lock,0,1);
            CHECK_STATUS("sem_init");
            ....
        }
        ...
    }
    

    Acquire

    // waitflag 默认为 true
    int
    PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
    {
        int success;
        sem_t *thelock = (sem_t *)lock;
        int status, error = 0;
    
        dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));
    
        do {
            if (waitflag)
                //默认执行到这里
                status = fix_status(sem_wait(thelock));
            else
                status = fix_status(sem_trywait(thelock));
        } while (status == EINTR); /* Retry if interrupted by a signal */
    
        if (waitflag) {
            CHECK_STATUS("sem_wait");
        } else if (status != EAGAIN) {
            CHECK_STATUS("sem_trywait");
        }
    
        success = (status == 0) ? 1 : 0;
    
        dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
        return success;
    }
    

    Release

    void
    PyThread_release_lock(PyThread_type_lock lock)
    {
        sem_t *thelock = (sem_t *)lock;
        int status, error = 0;
    
        dprintf(("PyThread_release_lock(%p) called\n", lock));
        // sem_post 是关键,释放锁
        status = sem_post(thelock);
        CHECK_STATUS("sem_post");
    }
    

    RLock 的实现

    RLock表示的是 reentrant lock,如果该锁已经被获取,那么acquire 可以被同一个线程(进程)多次无阻塞调用。但是 release 必须被匹配的使用。

    下面可以看到 RLock 不过是一个浅包装

    def RLock(*args, **kwargs):
        return _RLock(*args, **kwargs)
    

    RLock 内部保存了一个普通的锁(thread.allocate_lock 生成),同时保存了 这个锁的 owner,

    class _RLock():
        def __init__(self):
            # 内部使用的 一个锁
            self.__block = _allocate_lock()
            # __owner 用来保存 acquire 成功时的线程 id
            self.__owner = None
            # acquire被重复调用的次数
            self.__count = 0
    

    python3 的实现

    python3 会判断系统是否支持 reentrant lock,如果支持则用系统的,否则用 python 代码实现一个。


    下面我们将看到,如何只用一个 Lock来实现其他的同步机制, Condition, Event, Semaphore等

    Condition 的实现

    多个线程可以用 condition 来等待同一个事件的发生,当一个事件发生后,所有等待的线程都可以得到通知。

    一个 Condition 总是和一个锁关联在一起的。可以传递一个锁,也可以由 构造函数自己创建一个。

    先看下如何使用

    import logging
    import random
    import threading
    import time
    
    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-9s) %(message)s',)
    
    queue = []
    
    
    def consumer(cv, q):
        logging.debug('Consumer thread started ...')
        while True:
            with cv:
                while not q:
                    logging.debug("Nothing in queue, consumer is waiting")
                    cv.wait()
                num = q.pop(0)
                logging.debug("Consumed %s", num)
                time.sleep(random.randint(1,3))
    
    
    def producer(cv, q):
        logging.debug('Producer thread started ...')
        while True:
            with cv:
                nums = range(5)
                num = random.choice(nums)
                q.append(num)
                logging.debug("Produced %s", num)
                cv.notify_all()
    
    
    if __name__ == '__main__':
        condition = threading.Condition()
        for i in range(10):
            threading.Thread(name='consumer%s' % i, target=consumer, args=(condition, queue)).start()
        pd = threading.Thread(name='producer', target=producer, args=(condition, queue))
        pd.start()
    

    下面看如何实现

    class _Condition:
        def __init__(self, lock=None, verbose=None):
            # 必须关联一个 Lock,如果没有的话,则自己创建一个 RLock
            if lock is None:
                lock = RLock()
            self.__lock = lock
            # 可以在 Condition上调用 acquire() and release() 方法,实际是调用的是内部锁的方法
            self.acquire = lock.acquire
            self.release = lock.release
            # 如果锁定义了 _release_save _acquire_restore _is_owned 方法,那么使用之,否则用自己定义的
            #......
            # 这个很重要,保存了等待在这个Condition上的信息
            self.__waiters = []
    

    下面看 wait方法,为了篇幅,省略了部分代码

        def wait(self, timeout=None):
            # 必须先成功调用acquire方法,才能调用wait
            if not self._is_owned():
                raise RuntimeError("cannot wait on un-acquired lock")
            # 生成一个锁,并调用 acquire,使得它处于 locked 状态
            # 这个锁代表一个waiter
            waiter = _allocate_lock()
            waiter.acquire()
            # 保存起来
            self.__waiters.append(waiter)
            saved_state = self._release_save()
            try:    # restore state no matter what (e.g., KeyboardInterrupt)
                if timeout is None:
                    # 再次调用 acquire 方法,等待锁被释放
                    waiter.acquire()
                    if __debug__:
                        self._note("%s.wait(): got it", self)
                else:
                    # 。。。。。。
            finally:
                # 必须恢复锁原来的状态,这个方法很重要
                self._acquire_restore(saved_state)
    

    再看下 notify方法

        def notify(self, n=1):
            # 同样,必须得调用 acquire成功,才可以调用本方法
            if not self._is_owned():
                raise RuntimeError("cannot notify on un-acquired lock")
    
            __waiters = self.__waiters
            waiters = __waiters[:n]
            if not waiters:
                if __debug__:
                    self._note("%s.notify(): no waiters", self)
                return
            self._note("%s.notify(): notifying %d waiter%s", self, n,
                       n!=1 and "s" or "")
            for waiter in waiters:
                # 调用 锁上的 release 方法,使得等待者可以继续
                waiter.release()
                try:
                    __waiters.remove(waiter)
                except ValueError:
                    pass
    

    寻工作

    本人正在找工作。地点深圳。请联系我微信 sunfriend

    相关文章

      网友评论

        本文标题:Python线程锁的实现

        本文链接:https://www.haomeiwen.com/subject/svkhgftx.html