美文网首页
ReetrantReadWriteLock源码分析

ReetrantReadWriteLock源码分析

作者: 九点半的马拉 | 来源:发表于2020-05-31 17:24 被阅读0次

    在该篇文章中我将开始介绍ReentrantReadWriteLock有关源码分析。

    我先简单介绍下有关变量的含义,再分析读锁和写锁的获取和释放过程。

    有关变量

    1.png

    ReadLock -- 控制读锁的有关逻辑
    WriteLock -- 控制写锁的有关逻辑
    Thread firstReader -- 第一次调用读锁的线程
    int firstReaderHoldCount -- 第一次调用读锁的线程的读的次数
    HoldCounter cachedHoldCounter -- 用于缓存,记录最后一次获取读锁的线程的读可重入数
    ThreadLocalHolderCounter readHolds -- 用一个ThreadLocal记录当前线程持有的读可重入数

    static final class HoldCounter {
           int count = 0;
           // Use id, not reference, to avoid garbage retention
           final long tid = getThreadId(Thread.currentThread());
    }
    
    static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                public HoldCounter initialValue() {
                    return new HoldCounter();
                }
    }
    

    读写状态控制

    在控制锁逻辑的Sync中,可以定义使用公平锁还是非公平锁。默认是非公平的。

    public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
    }
    

    其中,ReadLock使用了共享模式,WriteLock使用了独占模式;

    同一个AQS实例可以同时使用共享模式和独占模式,WriteLockReadLock两个锁维护了同一个同步队列,同步队列中只有一个int类型的state变量来表示当前的同步状态,那内部是如何把两个读写状态分开,并且达到控制线程的目的呢?

    实际上就是将state分成两个部分,其中高16位表示读状态低16位表示写状态。 所以能够表示的最大次数就是2^16 - 1 = 65535次。

    那具体如何计算的呢?

    static final int SHARED_SHIFT   = 16;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    //计算读锁数量
    // 就是右移16位,保留了高16位
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 就是c和0x0000FFFF这个值做与操作
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    

    写锁的获取和释放相对简单些,先介绍下这个。

    写锁的获取和释放

    写锁的获取

    写锁加锁时会调用:

    public void lock() {
                sync.acquire(1);
    }
    // 如果获取写锁失败,则放入到同步等待队列中
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {     
           Thread current = Thread.currentThread();
           int c = getState();
           // 获取当前的写锁数量,当不为0时,表明已经有线程拿到了写锁
           int w = exclusiveCount(c);
           if (c != 0) {
                 // 两种情况, 拒绝获取写锁
                 // 1) w == 0 表明没有线程使用写锁,而c != 0 表明存在读锁
                 // 2)如果存在写锁但是不是当前线程
                 if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                 // 当写锁数量超过最大值2^16 - 1,时,抛出异常
                 if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                 // 这里不需要CAS,知道这里的,只能是写锁重入
                 setState(c + acquires);
                 return true;
           }
           // 之前没有线程获取读写锁,
           // CAS修改state失败,返回,添加到同步队列中
           if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
           setExclusiveOwnerThread(current);
           return true;
    }
    

    在公平锁中:

    // 当前线程之前有排队的线程,这时就不能获取写锁
    final boolean writerShouldBlock() {
                return hasQueuedPredecessors();
    }
    

    在非公平锁中:

    // 不需要判断前面是否有等待的线程,可直接参与抢占
    final boolean writerShouldBlock() {
          return false; 
    }
    

    写锁的释放

    public void unlock() {
           sync.release(1);
    }
    
    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
    }
    
    protected final boolean isHeldExclusively() {
         return getExclusiveOwnerThread() == Thread.currentThread();
    }
    
    protected final boolean tryRelease(int releases) {
                // 如果当前线程不是持有写锁的线程
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                // 减去写锁值
                int nextc = getState() - releases;
                // 写锁数量为0
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                // 当所有的写锁释放完了才会返回true,唤醒后续节点
                return free;
    }
    

    读锁的获取和释放

    读锁是共享锁

    public void lock() {
           sync.acquireShared(1);
    }
     
    public final void acquireShared(int arg) {
            // 返回值小于0表明没有获取到读锁,
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
    }
    
    protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                // 如果存在写锁并且写锁的持有者不是当前线程
                // 拒绝尝试获得读锁
                // 所以当当前线程持有写锁的时候,是有可能拥有读锁的,这个到后面讲解
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                // 获取当前的读锁数量
                int r = sharedCount(c);
                // 具体的readerShuoldBlock逻辑在后面讲解,
                // 当能进入该方法,并CAS成功后
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    // 当前的读锁数量是0,说明当前是第一次读
                    if (r == 0) {
                        // 将将当前线程设置为第一次读的线程
                        firstReader = current;
                        // 第一次读的线程的读数量
                        firstReaderHoldCount = 1;
                    // 支持可重入,当第一次读的线程又进入后
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        // 用于缓存最后一个获取读锁的线程
                        HoldCounter rh = cachedHoldCounter;
                        // 如果缓存的不是当前线程,则通过ThreadLocal获取记录当前线程的值,并修改
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
    }
    

    readerShouldBlock在公平锁和非公平锁中的实现方法不一样

    在公平锁中:
    如果前面有等待的队列的话,返回true,那么就不能直接获取锁

    final boolean readerShouldBlock() {
          return hasQueuedPredecessors();
    }
    

    在非公平锁中:
    如果阻塞队列中head的第一个后继节点是写锁的话,则当前线程则不能尝试获取锁

    final boolean readerShouldBlock() {
          return apparentlyFirstQueuedIsExclusive();
    }
    
    // 如果头节点的后驱节点是独占模式,返回true
    final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
    }
    
    final int fullTryAcquireShared(Thread current) {
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    // 当存在写锁并且持有者不是当前线程
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                    // 如果不存在写锁,或者写锁持有者是当前线程
                    // 是公平锁时,前面存在等待的队列
                    // 非公平锁,等待队列的第一个后继节点是写锁
                    } else if (readerShouldBlock()) {
                        // 如果是第一次的线程在,则不处理
                        if (firstReader == current) {
                            // assert firstReaderHoldCount > 0;
                        // 如果是其他读的线程,则获取当前线程的读数量
                        // 如果是0,则从ThreadLOcalMap中删除
                        // 并拒绝尝试获得读锁
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    // 读锁超出限制,抛出异常
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // CAS更新状态
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        // 这里的逻辑和之前的差不多,就不仔细讲了
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
    }
    

    当获取失败后会调用doAcquireShared方法。

    主要逻辑是先将节点添加到同步等待队列中,然后进入for循环
    如果当前节点的前驱节点是头结点,则调用tryAcquireShared尝试获得读锁
    具体的逻辑就不讲了,大家可以看看AQS的共享锁机制。

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
     }
    

    读锁的释放

    public void unlock() {
           sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
    }
    
    protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                // 如果要释放的线程是当前线程
                if (firstReader == current) {
                    // 表明当前线程将要没有读锁,则重置为null
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    // 根据ThreadLocal获取当前线程所持有的读锁数量
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    // 当前线程的可重入次数将要没有,则从ThreadLocalMap中删除
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                // CAS重置state,等于0才能执行唤醒后续节点任务
                for (;;) {
                    int c = getState();
                    // 将去读锁的状态值
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // 读锁和写锁都没有了
                        return nextc == 0;
                }
    }
    

    当上述的方法返回true后,会调用doReleaseShared方法。

    private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
    }
    

    唤醒当前节点的后续节点,如果后续节点为null,或者后续节点状态大于0,说明被取消,所以从尾节点向前查找,查找最早的不被取消的节点,再进行唤醒

    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    实际上在读写锁中还有一个锁降级的机制,具体的就不展开讲了,有兴趣的可以阅读下该文章:
    Java并发编程之锁机制之ReentrantReadWriteLock(读写锁)

    这种在拥有写锁得到情况下,再获取读锁,随后释放写锁的过程 ,称之为锁降级。

    那为什么当线程获取写锁,修改数据完成后,要先获取读锁,而不是直接释放写锁呢?

    如果当前线程直接释放写锁,那么这个时候如果有其他线程获取了写锁,并修改了数据,那么对于当前释放的线程来说是无法感知数据变化的。先获取读锁的目的是保证没有其他线程来修改数据

    相关文章

      网友评论

          本文标题:ReetrantReadWriteLock源码分析

          本文链接:https://www.haomeiwen.com/subject/ayrrzhtx.html