美文网首页君临天下-Java
ReentrantReadWriteLock源码解析

ReentrantReadWriteLock源码解析

作者: 老荀 | 来源:发表于2020-04-04 23:38 被阅读0次

主要成员

读写锁也是区分了公平和非公平,除此之外还区分了读锁和写锁

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {

    private final ReentrantReadWriteLock.ReadLock readerLock;
   
    private final ReentrantReadWriteLock.WriteLock writerLock;
   
    final Sync sync;

    static final class NonfairSync extends Sync {
        ...
    }
    static final class FairSync extends Sync {
        ...
    }
    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
        ...
    }
    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
        ...
    }
}

构造器

同样默认也是非公平的

public ReentrantReadWriteLock() {
    this(false);
}
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

AQS中的state

读写锁把AQS中的一个state字段当成了两个字段来使用
高16位记录当前共享锁数量,低16位记录当前排他锁数量

private volatile int state;

读写锁中的Sync相对于重入锁多了以下一些方法和常量字段

abstract static class Sync extends AbstractQueuedSynchronizer {
       
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    // 获取共享锁的数量
    static int sharedCount(int c)    { 
        return c >>> SHARED_SHIFT; 
    }

    // 获取排他锁的数量
    static int exclusiveCount(int c) { 
        return c & EXCLUSIVE_MASK; 
    }
    ...
}

lock

这里先提前剧透下,读写锁中的写锁几乎和ReentrantLock没有区别,感兴趣的可以直接看WriteLock中的实现,以及公平和非公平的区别也和ReentrantLock中没区别,感兴趣的可以直接看我上一篇ReentrantLock源码解析
所以这里直接看读锁的实现

public void lock() {
    // 是直接委托AQS去实现的
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        // 当前有写并发,并且不是自己,走排队流程
        return -1;
    // 获取当前共享锁数量
    int r = sharedCount(c);
    // 刚开始的时候readerShouldBlock肯定是false,因为AQS队列还没初始化
    // 返回true的话就是队列中第一个节点是写请求
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        // CAS对共享锁数量+1
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 修改成功才会进入
        if (r == 0) {
            // 设置一些成员表明,自己是第一个读锁请求线程
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 这里是为了可重入做判断
            firstReaderHoldCount++;
        } else {
            // 如果是后来的其他读锁请求
            // 会通过cachedHoldCounter把读锁数量缓存起来,方便之后unlock的时候计算,当前的共享锁数量
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        // return 1就是获取锁成功了
        return 1;
    }
    // 修改共享锁数量失败
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    // 这里也是用死循环来确保共享锁修改成功
    for (;;) {
        // 前半段和上面一个方法是一样的
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        // 判断下最大count,由于读写锁分离了高低16位,所以读并发或者读的可重入数量是不能超过65535
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 修改共享锁数量成功
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

排队流程

private void doAcquireShared(int arg) {
    // 往AQS队列末尾添加一个共享节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 如果自己是队列的第一个
                // 尝试去获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 修改成功,将会连锁唤醒下一个共享节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 和ReentrantLock一样
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 和ReentrantLock一样
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 这里就是Propagate(传播)的含义了
            // 会唤醒下一个共享节点,并且这个节点也会尝试唤醒下一个共享节点,以此类推
            doReleaseShared();
    }
}

unlock

读锁在解锁的时候略有不同

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 当由第一个线程解锁的时候,会把这些成员重置
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 把缓存读锁数量逐步减1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 设置共享锁数量成功后,要判断是不是清0了,清0成功才会走唤醒流程
            return nextc == 0;
    }
}

我觉得和ReentrantLock没什么区别,最后都是调用LockSupport的unpark去唤醒队列中的第一个节点

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 这里要把head节点的状态设置为0,设置成功了才会去唤醒
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

关于读写锁的疑问

  • 记录读锁相关的一些成员,其实这些成员以我拙见是没什么用的,jdk注释也说了,意思是相当于提供一个数据提供的入口,但是读锁的数量包括是否最终release还是以state的高16位来判断的
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

总结

ReentrantReadWriteLock相较于之前的ReentrantLock来说,最大的特点就是读操作的几乎是可以同时进行的(除了修改共享锁数量时的CAS操作,因为CAS操作只可能有一个线程成功)。

  • 第一个获取锁的线程是读操作,在他释放锁之前,中间时间的所有读操作都是可以同时进行的,写操作是需要在AQS中排队的。
  • 第一个获取锁的线程是写操作,那么后续所有的线程都需要进AQS队列排队,但当这个写操作释放锁之后,如果又被读操作获取到了锁,那么在这个读操作之后排队的其他连续(以AQS链表顺序为准)读操作均会被唤醒进而可以读并发。

相关文章

网友评论

    本文标题:ReentrantReadWriteLock源码解析

    本文链接:https://www.haomeiwen.com/subject/hnuephtx.html