美文网首页Java 杂谈
ReentrantReadWriteLock

ReentrantReadWriteLock

作者: 囧囧有神2号 | 来源:发表于2018-05-30 16:01 被阅读0次

ReentrantLock是独占锁,只允许一个线程执行;CountDownLatch,Semaphore等是共享锁;它们分别利用了AQS的独占与共享功能;那么如果在读操作远多于写操作的情况下该如何选择?读写锁,之前的文章中介绍了如何自己实现一个读写锁,还实现了重入功能,读读,写写,读写,写读四种重入。现在来看看JUC包下的ReentrantReadWriteLock的实现。
先来大致了解下ReentrantReadWriteLock:

  • 读锁是个共享锁,写锁是个独占锁。读锁同时被多个线程获取,写锁只能被一个线程获取。读锁与写锁不能同时存在。
  • 一个线程可以多次重复获取读锁和写锁
  • 锁降级:获取写锁的线程又获取了读锁,之后释放写锁,就完成了一次锁降级。
  • 锁升级:不支持升级。获取读锁的线程去获取写锁的化会造成死锁。
  • 重入数:读锁和写锁的最大重入数为65535
  • 公平与非公平两种模式

AQS维护了一个int值,表示同步状态;对于ReentrantLock,state会在0与1之间变化,1表示已被占有后续线程入队列等待,0表示free。对于CountDownLatch,会先将state赋予个大于0的值,在该值变为0后唤醒等待队列中的线程。那么如何用它来即表示读锁又表示写锁呢?读锁我们是允许多个线程同步运行的,我们还允许重入,那么拿什么来记录每个线程读锁的重入数?

针对上面两个问题,对于同步状态status,高16位表示所有线程持有的读锁总数,低16位为一个线程的写锁总数,包括重入。

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
       
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

采用ThreadLocal来记录每个线程锁持有的读锁数目。

        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        private transient ThreadLocalHoldCounter readHolds;
        private transient HoldCounter cachedHoldCounter;
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;
  • HoldCounter 静态内部类用来记录一个线程的读锁重入数,以及id;
  • ThreadLocalHoldCounter继承了ThreadLocal,实现了initialValue方法,作用是在没有set前调用get的话initialValue会被调用,HoldCounter对象会被存储到Entry里,并返回它。变量名为readHolds,它用来存储/获取线程的读锁数量。因为读锁是共享的,我们利用同步状态的高16位来记录总数,用threadlocal来记录每个线程所持有的读锁数目。对于写锁来说它是独占锁,低16位代表的就是当前线程持有的写锁数目。
  • cachedHoldCounter:它是一种优化的手段,为了避免频繁的调用ThreadLocalHoldCounter的读取,更改甚至删除操作,于是缓存最新一个成功获取锁的线程的HoldCounter,意思是当一个线程需要记录值的时候会先检查自己是否是cachedHoldCounter中缓存的那个线程,是的话就不用再从readHolds中获取了,减少对ThreadLocal的操作。
  • firstReader 与firstReaderHoldCount:代表首个获取读锁的线程与其所持有的读锁数,该读锁数不会存储进readHolds,这是种优化,针对只有一个线程的情况,避免频繁操作readHolds。

ReentrantReadWriteLock继承了ReadWriteLock,这两个方法分别返回读锁与写锁。ReentrantReadWriteLock内部实现了两个类:ReadLock&WriteLock分别实现读锁与写锁。

public interface ReadWriteLock {
    Lock readLock();

    Lock writeLock();
}

同ReentrantLock一样内部实现了非公平与公平两种同步器:NonfairSync &FairSync ,继承自同一同步器Sync。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

只定义了writerShouldBlock & readerShouldBlock两种方法,它们作用在获取锁的过程中,决定当前线程是否该阻塞。

一 读锁

1.获取读锁

        public void lock() {
            sync.acquireShared(1);
        }

定位到AQS的acquireShared,该方法之前介绍过。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

来看看Sync重写的tryAcquireShared方法

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

写锁不为零且持有写锁的并非本线程,则返回-1,之后在acquireShared中将线程的节点放入到等待队列中。写锁不为零但是正是本线程持有的,则代表写读重入。之后在readerShouldBlock返回false与CAS操作成功后,更新HoldCounter 的值,这里会对之前提到的firstReader ,firstReaderHoldCount 或cachedHoldCounter进行相应的操作。

如果CAS失败或者readerShouldBlock返回true,则会调用fullTryAcquireShared,该方法会继续尝试获取读锁,可以看成是tryAcquireShared的升级版。

先来看看readerShouldBlock()方法:
公平模式下,根据队列中当前线程之前有没有等待的线程来决定。

        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }

非公平模式

        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }

调用apparentlyFirstQueuedIsExclusive()

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
  • 返回true代表等待队列head.next节点是等待写锁的线程,该方法的目的是不让写锁一直等待下去;比如在上一篇自己实现的读写锁中,通过增加一个写请求变量来防止写饥饿,让写锁的优先级高于读锁。这里有相似的目的。
  • 这个方法是不可靠的,因为在检测过程中队列结构是在变化的,;但是我们并不依赖于它的准确表达,它更多是一种探测,一种优化,我们希望它来防止写锁的饥饿;而且并不是该方法返回了true,线程就会被放入阻塞队列退出竞争,来看fullTryAcquireShared的逻辑
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

之前说fullTryAcquireShared是tryAcquireShared的升级版,它处理了CAS失败和readerShouldBlock返回true的情况;

  1. 先检查写锁是否被其他线程占用;
  2. 调用readerShouldBlock检查当前线程是否应该进入等待队列,返回true也不代表该线程要进入等待队列,我们看它的处理逻辑:如果firstReader == current代表当前只有你一个读线程,那么不用等待可以获取读锁;只有在rh.count == 0(意味着该线程没有持有读锁)的情况下返回-1代表线程要进入等待队列。为什么?持有读锁的线程不能进入同步队列?
  3. 之后便是CAS,成功便更改HoldCounter值返回1,代表获取读锁成功,否则循环再次检查,总之不能轻易的将线程放入等待队列,容易造成死锁。

上面问题的解答:一个线程是不能随便放入队列中等待的,容易造成死锁,看下面两种情况:

  1. 如果一个线程持有读锁,重入失败被放入等待队列,若等待队列中排在它前面的线程里有等待写锁的线程,那么就会造成死锁,因为读锁与写锁是互斥的。
  2. 假设一个线程持有写锁进行“锁降级申请”,被放入同步队列,那么不仅之后的读写线程都会被放入队列,队列中之前有等待线程,无论等待的是读或写锁都将造成死锁。

回到fullTryAcquireShared

  1. 它先判断是否有写锁,如果有且就是本线程就不会进行readerShouldBlock判断,直接CAS,这样便解决了情况2的问题;
  2. 针对情况1线程持有读锁的情况,即使readerShouldBlock返回true,rh.count == 0不符合不会返回-1,也就不会将线程放入队列。

总结就是fullTryAcquireShared,采用for循环方式让线程不断判断与尝试,且只有在一种情况下才会将线程放入队列:readerShouldBlock返回true(原因可能是公平模式或者第一个等待线程(head.next)在等待写锁),当前线程不是第一个读线程且没有持有读锁。

2.释放读锁

        public void unlock() {
            sync.releaseShared(1);
        }

定位到AQS的releaseShared,之前介绍过。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

来看看同步器Syn实现的tryReleaseShared

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

1.如果是firstReader ,就对它及firstReader进行修改;2.如果不是,就对readHolds进行修改;3. 自旋CAS修改status
返回true代表status == 0,表示既没有读锁也没有写锁。

3. tryLock

        public boolean tryLock() {
            return sync.tryReadLock();
        }

调用了同步器Syn的tryReadLock

        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

感觉与tryAcquireShared很像,不同在于tryReadLock只尝试获取读锁一次,成功就返回true,否则false;这是由于方法用途不同,所以设计自然不同。

二, 写锁

1,获取写锁

        public void lock() {
            sync.acquire(1);
        }

定位到AQS的acquire中,AQS文章里介绍过。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

来看看同步器Syn重写的tryAcquire方法

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

一 :c != 0下分两种 1. w == 0 代表有读锁;2. w != 0 && current != getExclusiveOwnerThread() 代表有其他写锁;以上两种返回false,线程要进入等待队列。否则就设置status值,返回true,获取写锁成功。
二 :c == 0情况下要看writerShouldBlock的情况,返回false就会去CAS更改同步状态,成功就将AOS里的exclusiveOwnerThread设置位当前线程,最后返回true;

注:情况一用setState更改同步状态,情况二用compareAndSetState?情况一执行到setState这步说明当前线程已持有写锁,是在重入,其他线程都会被排斥不同担心线程安全问题,所以setState就可以,同步状态status是volatile的。情况二里当前即无读锁也无写锁,当前线程始于其他线程在竞争,所以要利用CAS来保证原子性。
持有读锁线程不能申请写锁,即不能升级,从tryAcquire可以看出持有读锁线程一定会返回false,也就是会被放入队列中等待写锁,但是它持有的读锁将不会被释放,那么写锁就不肯能获取到,虽然不影响读锁的获取,但所有写锁都将不能被获取到。

来看看writerShouldBlock方法
非公平模式:直接返回false,也就是写请求可以插队,即写优先级高;

        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }

公平模式:考虑的是当前队列是否有等待的线程

        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }

2,释放写锁

        public void unlock() {
            sync.release(1);
        }

定位到AQS中的release

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

来看看同步器Syn重写的tryRelease

        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

注意返回值:返回true代表写锁个数为0,也就是写锁可用;返回false表示写锁仍被当前线程占着,可能是因为当前线程重入了写锁。

3,tryLock

        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }

定位到同步器Syn

        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

tryWriteLock方法看上去跟tryAcquire方法真的很像。唯一的区别在于,tryWriteLock忽略的writerShouldBlock方法;该方法的调用就是去抢写锁,抢不到返回false就行了。

相关文章

网友评论

    本文标题:ReentrantReadWriteLock

    本文链接:https://www.haomeiwen.com/subject/rwbujftx.html