美文网首页多线程
ReentrantReadWriteLock读写锁性质以及锁降级

ReentrantReadWriteLock读写锁性质以及锁降级

作者: virtual灬zzZ | 来源:发表于2022-01-17 01:57 被阅读0次

ReentrantReadWriteLock 读写锁是一种改进的排他锁,也可以称作共享/排他锁. 允许多个线程同时读取共享数据,但是一次只允许一个线程对共享数据进行更新。

读写锁通过读锁与写锁来完成读写操作. 线程在读取共享数据前必须先持有读锁,该读锁可以同时被多个线程持有,即它是共享的.线程在修改共享数据前必须先持有写锁,写锁是排他的, 一个线程持有写锁时,其他线程无法获得相应的锁。

读锁只是在读线程之间共享,任何一个线程持有读锁时,其他线程都无法获得写锁, 保证线程在读取数据期间没有其他线程对数据进行更新,使得读线程能够读到数据的最新值,保证在读数据期间共享变量不被修改。

读写锁相互的关系如下:

获得条件 排他性 作用
读锁 写锁未被任意线程持有 对读线程是共享的,对写线程是排他的 允许多个读线程可以同时读取共享数据,保证在读共享数据时,没有其他线程对共享数据进行修改。
写锁 该写锁未被其他线程持有,并且相应的读锁也未被其他线程持有 对读线程或者写线程都是排他的 保证写线程以独占的方式修改共享数据。

先查看源码:

ReentrantReadWriteLock:

public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

ReadLock:

以共享的方式获取锁,tryAcquireShared < 0 ,意思就是 不是小于 0 就是获得锁了,否则执行doAcquireShared()方法,可见该方法和ReentrantLock的acquireQueued() 方法是非常相似的,就是入队的操作。

public void lock() {
     sync.acquireShared(1);
}

public final void acquireShared(int arg) {
   if (tryAcquireShared(arg) < 0)
       doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            //1. 如果写锁已经被获取并且获取写锁的线程不是当前线程的话,当前
          // 线程获取读锁失败返回-1
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                //2. 当前线程获取读锁
                compareAndSetState(c, c + SHARED_UNIT)) {
               //3. 下面的代码主要是新增的一些功能,比如getReadHoldCount()方法
              //返回当前获取读锁的次数
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //4. 处理在第二步中CAS操作失败的自旋已经实现重入性
            return fullTryAcquireShared(current);
        }
tryAcquireShared()方法

有意思的是tryAcquireShared()[ 该方法在AQS中的 ]中的方法:int r = sharedCount(c); 和 int c = getState(); exclusiveCount(c) : 看它们的源码:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

int类型是4个字节,32bit,SHARED_SHIFT=16,是个分割点,看源码得知,r变量是读锁获取的计数,使用exclusiveCount(c)计算出来的是写锁获取的计数,读锁获取计数采用高16位,写锁获取采用低16位。如图:


回归正题,在分割点,可以类比它们各自占16bit,由左向右,它们各自的起始点按平常的二进制计算即可,它们各自算出的数采用分割点的话,跟传入的数是一样的,这点无需多虑。

看看获取锁的判断条件:
if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {}
  1. 第一步就是根据公平性质来是否顺序获取的,
  2. 第二步是判断锁有无被写线程占有,
  3. 第三步就是熟悉的cas了,

如果任一不通过,最后还是fullTryAcquireShared(),里头和判断if框柱的内容相似,无非继续自旋 ,读线程继续累加直到成功,个中采用threadLocal,这里不展开。

这里有个非常秒的点,有firstReader(Thread类型), 它代表第一个获得读锁的线程,同时配有一个firstCount计数器(int类型),非第一个获得读锁的线程,就会另外新建HoldCounter,采用的是threadLocal,每个获得读锁都配一个,对其进行计数,释放时也按数目释放。采用firstReader,是为了只有一个读线程的时候,就不需要new HoldCounter 了,也就是不需要用到threadLocal那么复杂了。

要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。

其中关键点:

r < MAX_COUNT 这个判断条件非常秒,因为读写是互斥的,MAX_COUNT是r高位=1时 - 1 的数值,如果r不为0,那就是大于 MAX_COUNT了,证明写锁正在占有锁,就乖乖去排队等锁。

条件 !readerShouldBlock ,注意取反,无非就是判断 该线程 要不要阻塞,分别分为公平和非公平、只有返回-1拿不到锁,才会进入doAcquireShared,还是双向队列,注意addWaiter(Node.Shared),非公平的!s.isShared() 已经不满足,直接外面判断true,直接争锁,公平还是那个熟悉的hasQueuedPredecessors(),AQS中介绍过,就是查看队列有无已经排队的,有就加入,无就争锁,这里不展开。

static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
    }

final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

writeLock

写锁是排他的,代码中的acquire和ReentrantLock非常相似,Node同样是Exclusive,tryAcquire()方法也被ReentrantReadWriteLock重写。

public void lock() {
     sync.acquire(1);
}

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
展开看看tryAcquire()以排他的方式获取锁

只讲这部分,后续其他和ReentrantLock一样,不展开。

protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
其中关键点:

拿低16的写锁数值,if (w + exclusiveCount(acquires) > MAX_COUNT)这句判断和读锁一样秒,不展开说,就是重入的意思。

writerShouldBlock代码:

  • 非公平直接了当放回false,外面就靠compareAndSetState能不能抢的锁了,不能就去排队。
  • 公平的话,还是熟悉的hasQueuedPredecessors(),队列没有前辈就抢锁,抢失败去排队,如果有前辈,直接ture,后面不用cas,直接去排队。
static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
    }

当然。它们还有tryLock、tryLock(timeout, timeUnit)、tryLockInterruptibly()、unlock()。基本原理和ReentrantLock差不多,这里就不展开。

锁降级

在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。

在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。

仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。

参考:
ReentrantReadWriteLock读写锁详解

相关文章

网友评论

    本文标题:ReentrantReadWriteLock读写锁性质以及锁降级

    本文链接:https://www.haomeiwen.com/subject/vmbqhrtx.html