美文网首页并发编程
ReentrantReadWriteLock源码分析

ReentrantReadWriteLock源码分析

作者: 拥抱孤独_to | 来源:发表于2019-05-24 17:20 被阅读0次

并发源码分析篇:

基于上篇文章分析了同步器的作用之后,接下来了解ReentrantReadWriteLock读写锁也就简单多了,ReentrantReadWriteLock比ReentrantLock性能又更佳了一些,针对读操作锁共享,针对写操做锁互斥,当有线程在对数据写入的时候读操作和写操作都会阻塞,都是读操作的时候锁共享,线程不会阻塞。读写锁的基本用法如下

public class ReadWriteLockDemo {
    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    static HashMap<String,Object> map = new HashMap<>();
    public static void main(String[] args) {
        new Thread(()->{
            getObject("name");
        }).start();
        new Thread(()->{
           writeObject("name","xiaoming");
        }).start();
    }
    public static void writeObject(String key,Object value) {
        try {
            writeLock.lock();
            map.put(key,value);
        } finally {
            writeLock.unlock();
        }
    }
    public static Object getObject(String key) {
        try {
            readLock.lock();
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }
}

ReadLock和WriteLock的获取是通过ReentrantReadWriteLock类提供的方法获取的,可以看到ReentrantReadWriteLock的构造函数如下

    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
   public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
   public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

通过ReentrantReadWriteLock构造函数初始化了Sync和readerLock,writerLock 属性,此时的Sync的实际对象是NonfairSync。并且ReadLock,WriteLock通过传参构造将ReentrantReadWriteLock本身传递进去而且将
ReadLock,WriteLock类中的Sync也赋值为ReentrantReadWriteLock类中Sync,所以ReadLock,WriteLock中Sync都是NonfairSync。

    protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
    }

接下来可以先看下WriteLock中的lock方法

    public void lock() {
        sync.acquire(1);
    }

这里走的还是AQS中的acquire方法,这里除了tryAcquire的实现有稍微的区别其他的都和ReentrantLock一样,没获取到锁就会被构成链表阻塞。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

继续走tryAcquire方法,该方法实际调用的是ReentrantReadWriteLock类中的重写方法。

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

这里,我们分两种情况来分析,先来分析假如有写锁先获取到锁,后面的读操作和写操作会是什么情况。同样线程A表示写入的线程,线程B为读线程,线程C为写入的线程
线程A先执行这段代码,获取到state值为0,所以会执行writerShouldBlock方法,实际上执行的就是NonfairSync的writerShouldBlock方法,直接返回的false标识,所以继续会走下面的CAS将状态改为1,并且独占线程设置为线程A,返回true。线程A就这样成功的获取到锁了,并且可以正常执行逻辑。读线程B此时进来,调用readLock.lock()

        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;

所以这里就直接返回了-1,读线程B被阻塞。同理线程C进来也将阻塞。 而unlock方法跟ReentrantLock是一样的逻辑,都是将state设置为0并且唤醒链表中的下一个线程获取锁。
从这里可以看出,如果有写线程获取锁,其他线程都将会被阻塞,跟ReentrantLock一样都是互斥的
在来分析如果是读线程先获取到锁,后续将是一个什么情况。同样假设A为读线程,B为读线程,C为写线程
首先,线程A调用lock方法,这里我们就要看ReadLock中的lock方法

        public void lock() {
            sync.acquireShared(1);
        }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

tryAcquireShared同样掉调用的是ReentrantReadWriteLock中的tryAcquireShared方法,接下来继续分析该方法的实现

        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

首先是获取state的值,此时state=0,接着获取独占线次数exclusiveCount(c)

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

这里先说明一下,exclusiveCount方法就是用state & (2^16-1)的值就是独占次数,为什么这样就能获得独占次数,其实从下面的compareAndSetState(c, c + SHARED_UNIT))方法就可以看出,共享锁也就是读线程是共享的,每次获取共享锁state的值就会+2^16 这就说明了如果只存在共享锁,exclusiveCount(state)的值一定会是0,所以回过头看写锁中的这段代码

     int c = getState();
        int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;

state !=0 w==0说明只可能是有共享锁,所以获取写锁失败,如果w!=0 继续会判断是否为同一个线程,不是的话也写入失败。
接着来分析A线程,此时独占次数和共享次数都是0,程序接着走readerShouldBlock方法

  final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
  }

该方法的作用就是判断阻塞队列中第一个元素是否是独占线程,也就是写数据线程,此时链表中是没有数据的,所以放回false。所以A线程最终执行下面代码

      if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
      }

最终返回1,获取到了锁。
这是读线程B又调用了lock方法

    protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

这时候c=2^16,exclusiveCount(c)=0,r=1,并且链表中依然是空的,所以代码将走else逻辑

                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;

用HoldCounter对象存储每个读线程获取锁的次数,一个线程读锁也是可以重入的,所以没有完全释放锁的话就获取不到写锁。
如果此时链表中有个写线程阻塞了,!readerShouldBlock()就会为false,代码将走fullTryAcquireShared逻辑

        /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

方法注释就可以知道该方法的作用

读取的完整版本,处理CAS未命中*和tryAcquireShared中未处理的重入读取(直接有道翻译的)。

!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)

其实就是处理这三个状态值分别为false的情况,所以现在是链表中有写线程阻塞,代码会执行

else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }

所以获取读锁失败,读线程会加到链表之后。

总结

这篇文章没有在重复链表中的各个状态,可以先看ReentrantLock源码分析,有详细的AQS分析 ReentrantLock源码分析

ReentrantReadWriteLock锁其实可以简单理解为有写线程占据锁的话,其他线程都阻塞,如果有读线程占据锁的话,写线程阻塞,读线程可以正常获取锁,如过有写线程在阻塞等待锁,读线程也需要被阻塞

相关文章

网友评论

    本文标题:ReentrantReadWriteLock源码分析

    本文链接:https://www.haomeiwen.com/subject/jgluzqtx.html