美文网首页python自学
Python线程锁的实现

Python线程锁的实现

作者: zhaoxg_cat | 来源:发表于2018-09-07 20:01 被阅读64次

Python 线程锁的实现

Lock 的实现

锁只有两种状态,锁定或者未锁定

Lock = _allocate_lock

_allocate_lock = thread.allocate_lock

thread.allocate_lock 是用C代码实现的,代码位置 Python/thread_pthread.h

假设我们的系统支持 POSIX semaphores

首先看下 sem_init 的原型

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

pshared决定了这个信号量是在进程中共享还是在线程中共享。

  • pshared 为 非零值,那么不同进程中都可以共享
  • pshared 为 零值,那么在当前进程的线程中共享。

https://svn.python.org/projects/python/trunk/Python/thread_pthread.h

PyThread_type_lock
PyThread_allocate_lock(void)
{
    ...
    /* 申请内存 */
    lock = (sem_t *)malloc(sizeof(sem_t));

    if (lock) {
        /*
        初始化
        value 为1,表明这个锁是 unlocked,被该进程的所有线程共享
        */
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");
        ....
    }
    ...
}

Acquire

// waitflag 默认为 true
int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
    int success;
    sem_t *thelock = (sem_t *)lock;
    int status, error = 0;

    dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));

    do {
        if (waitflag)
            //默认执行到这里
            status = fix_status(sem_wait(thelock));
        else
            status = fix_status(sem_trywait(thelock));
    } while (status == EINTR); /* Retry if interrupted by a signal */

    if (waitflag) {
        CHECK_STATUS("sem_wait");
    } else if (status != EAGAIN) {
        CHECK_STATUS("sem_trywait");
    }

    success = (status == 0) ? 1 : 0;

    dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
    return success;
}

Release

void
PyThread_release_lock(PyThread_type_lock lock)
{
    sem_t *thelock = (sem_t *)lock;
    int status, error = 0;

    dprintf(("PyThread_release_lock(%p) called\n", lock));
    // sem_post 是关键,释放锁
    status = sem_post(thelock);
    CHECK_STATUS("sem_post");
}

RLock 的实现

RLock表示的是 reentrant lock,如果该锁已经被获取,那么acquire 可以被同一个线程(进程)多次无阻塞调用。但是 release 必须被匹配的使用。

下面可以看到 RLock 不过是一个浅包装

def RLock(*args, **kwargs):
    return _RLock(*args, **kwargs)

RLock 内部保存了一个普通的锁(thread.allocate_lock 生成),同时保存了 这个锁的 owner,

class _RLock():
    def __init__(self):
        # 内部使用的 一个锁
        self.__block = _allocate_lock()
        # __owner 用来保存 acquire 成功时的线程 id
        self.__owner = None
        # acquire被重复调用的次数
        self.__count = 0

python3 的实现

python3 会判断系统是否支持 reentrant lock,如果支持则用系统的,否则用 python 代码实现一个。


下面我们将看到,如何只用一个 Lock来实现其他的同步机制, Condition, Event, Semaphore等

Condition 的实现

多个线程可以用 condition 来等待同一个事件的发生,当一个事件发生后,所有等待的线程都可以得到通知。

一个 Condition 总是和一个锁关联在一起的。可以传递一个锁,也可以由 构造函数自己创建一个。

先看下如何使用

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

queue = []


def consumer(cv, q):
    logging.debug('Consumer thread started ...')
    while True:
        with cv:
            while not q:
                logging.debug("Nothing in queue, consumer is waiting")
                cv.wait()
            num = q.pop(0)
            logging.debug("Consumed %s", num)
            time.sleep(random.randint(1,3))


def producer(cv, q):
    logging.debug('Producer thread started ...')
    while True:
        with cv:
            nums = range(5)
            num = random.choice(nums)
            q.append(num)
            logging.debug("Produced %s", num)
            cv.notify_all()


if __name__ == '__main__':
    condition = threading.Condition()
    for i in range(10):
        threading.Thread(name='consumer%s' % i, target=consumer, args=(condition, queue)).start()
    pd = threading.Thread(name='producer', target=producer, args=(condition, queue))
    pd.start()

下面看如何实现

class _Condition:
    def __init__(self, lock=None, verbose=None):
        # 必须关联一个 Lock,如果没有的话,则自己创建一个 RLock
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # 可以在 Condition上调用 acquire() and release() 方法,实际是调用的是内部锁的方法
        self.acquire = lock.acquire
        self.release = lock.release
        # 如果锁定义了 _release_save _acquire_restore _is_owned 方法,那么使用之,否则用自己定义的
        #......
        # 这个很重要,保存了等待在这个Condition上的信息
        self.__waiters = []

下面看 wait方法,为了篇幅,省略了部分代码

    def wait(self, timeout=None):
        # 必须先成功调用acquire方法,才能调用wait
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        # 生成一个锁,并调用 acquire,使得它处于 locked 状态
        # 这个锁代表一个waiter
        waiter = _allocate_lock()
        waiter.acquire()
        # 保存起来
        self.__waiters.append(waiter)
        saved_state = self._release_save()
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                # 再次调用 acquire 方法,等待锁被释放
                waiter.acquire()
                if __debug__:
                    self._note("%s.wait(): got it", self)
            else:
                # 。。。。。。
        finally:
            # 必须恢复锁原来的状态,这个方法很重要
            self._acquire_restore(saved_state)

再看下 notify方法

    def notify(self, n=1):
        # 同样,必须得调用 acquire成功,才可以调用本方法
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")

        __waiters = self.__waiters
        waiters = __waiters[:n]
        if not waiters:
            if __debug__:
                self._note("%s.notify(): no waiters", self)
            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")
        for waiter in waiters:
            # 调用 锁上的 release 方法,使得等待者可以继续
            waiter.release()
            try:
                __waiters.remove(waiter)
            except ValueError:
                pass

寻工作

本人正在找工作。地点深圳。请联系我微信 sunfriend

相关文章

  • python多线程

    python基础之多线程锁机制 GIL(全局解释器锁) GIL并不是Python的特性,它是在实现Python解析...

  • Python线程锁的实现

    Python 线程锁的实现 Lock 的实现 锁只有两种状态,锁定或者未锁定 thread.allocate_lo...

  • 浅析python的GIL

    Python中的GIL锁 在Python中,可以通过多进程、多线程和多协程来实现多任务。 在多线程的实现过程中,为...

  • [Python系列]Python多线程

    背景:说到多线程,我们会想到的是:异步编程、同步(锁)、共享变量、线程池等等,那么Python里面多线程是如何实现...

  • python 解决多核处理器算力浪费的现象

    我们都知道python因为其GIL锁导致每一个线程被绑定到一个核上,导致python无法通过线程实现真正的平行计算...

  • python之多线程与多进程入门

    python之多线程与多进程 关键词: GIL锁,IO繁忙,线程安全,线程同步,进程池,进程通信,队列 GIL锁;...

  • iOS开发经验(19)-多线程

    目录 pthread NSThread GCD NSOperation 线程锁 线程通讯 | 多线程实现方案 ...

  • GIL线程全局锁

    线程全局锁(Global Interpreter Lock),即Python为了保证线程安全而采取的独立线程运行的...

  • java并发

    1.并发编程中的锁 并发编程中的各种锁java高并发锁的3种实现Java并发机制及锁的实现原理 2.线程池核心线程...

  • ReentrantLock实现机制(CLH队列锁)

    如何实现一个锁 实现一个锁,主要需要考虑2个问题 如何线程安全的修改锁状态位? 得不到锁的线程,如何排队? 带着这...

网友评论

    本文标题:Python线程锁的实现

    本文链接:https://www.haomeiwen.com/subject/svkhgftx.html