美文网首页
juc并发组件(四)ReentrantReadWriteLock

juc并发组件(四)ReentrantReadWriteLock

作者: sadamu0912 | 来源:发表于2019-01-12 00:01 被阅读0次

下图是ReentrantReadWriteLock读写锁的UML类图。从图中可以清晰的看见,该类主要有读锁和写锁,以及同步器Sync组成。读锁调用 的是同步器Sync中的共享获取释放state方法tryAcquireShared,tryReleaseShared.当使用读写锁中的写锁时,调用的是tryAcquire,tryRelease,互斥的获取释放state的方法。

读写锁满足的语义:

  • 写锁完成的更新,对后续的读锁可见。
  • 可以由多个线程进入读锁
  • 当有写锁之后,其他线程无法获取写锁和读锁
  • 锁重入特性,读锁和写锁都可以重入。
  • 可以从写锁,降级为读锁
    image.png
    从这个类的整体结构来说,ReentrantReadWriteLock聚合了读锁ReadLock和写锁WriteLock,分别实现Lock接口,写锁在获取锁资源的时候,调用内聚的Sync同步器(AQS的子类)内部的tryAcquire和tryRelease方法,去独占式的获取锁资源。读锁在获取锁资源的时候调用内聚的Sync同步器(AQS的子类)内部的tryAcquireShared和tryReleaseShared方法,去共享式的获取锁资源。
    但是这两种锁资源,存在关系写锁完成的更新,对后续的读锁可见。 **
    那怎么办?
    只能两个锁状态,都放在一个volatile变量上,写锁调用tryAcquire和tryRelease对state进行操作完成。另外的读线程,立马可以感知到。
    这也是读写锁设计巧妙的地方,读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。

    image.png
    假设当前同步状态值为S,get和set的操作如下:
    (1)获取写状态:
    S&0x0000FFFF:将高16位全部抹去,也就是和EXCLUSIVE_MASK做与运算。
    (2)获取读状态:
    S>>>16:无符号补0,右移16位
    (3)写状态加1:
    S+1
    (4)读状态加1:
      S+(1<<16)即S + 0x00010000 即S+SHARED_UNIT
    接下来看源码分析:

写锁的获取和释放

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);   //获取写状态位
            if (c != 0) { 
                if (w == 0 ||                //如果c!=0&&w==0 ,直接return false。 因为读锁状态位!=0,也就是
                 //说有其他线程占用读锁。那么这时候,对共享资源修改,其它读线程可能无法在写线程结束 
               //时,感知到写线程对共享资源的修改
                  current != getExclusiveOwnerThread())  //另外一种情况,假如说w!=0,     
            //current !=   getExclusiveOwnerThread()  ,不是当前线程,写锁是互斥的,所以return false
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire  最后锁重入
                setState(c + acquires);   
                return true;
            }
            //走到这里说明c=0 ,这个时候,可能同步队列中还有线程需要抢占锁资源
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);//最后cas设置同步状态成功,把当前线程设置成独占线程
            return true;
        }

writerShouldBlock判断是否需要阻塞。公平和非公平方式实现不同,在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
FairSync公平模式

final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }

NonfairSync非公平模式直接返回false

final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
protected final boolean tryRelease(int releases) {
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

就是state减去入参,就好了,和ReentrantLock很相似。

读锁的获取和释放

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)   //如果写锁线程数 != 0 ,且独占锁不是当前线程则返 
                                                                           //回失败,因为存在锁降级
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {   ///r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++; //// 当前线程为第一个读线程,表示第一个读锁线程重入
                } else {
                    HoldCounter rh = cachedHoldCounter;  //cachedHoldCounter是上一个线程对应的重入次数
                    //第三个线程进来的时候,rh ,cachedHoldCounter存的是第二个线程的HoldCounter 
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();  //readHolds是个ThreadLocal变量
                    //第二个读线程进来的时候,把当前的HoldCounter对象放入cachedHoldCounter 
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

当compareAndSetState(c, c + SHARED_UNIT)失败的时候调用,以下方法

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {     //// 无限循环
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

读锁的释放,tryReleaseShared方法

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {   //// 当前线程为第一个读线程
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;  // 获取缓存的计数器
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {    // 无限循环
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))  
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

相关文章

网友评论

      本文标题:juc并发组件(四)ReentrantReadWriteLock

      本文链接:https://www.haomeiwen.com/subject/fduzrqtx.html