美文网首页
多线程-AQS-ReentrantReadWriteLock

多线程-AQS-ReentrantReadWriteLock

作者: 麦大大吃不胖 | 来源:发表于2021-01-28 16:20 被阅读0次

    AQS

    by shihang.mai

    AQS是实现ReentrantReadWriteLock的核心

    ReentrantReadWriteLock

    ReentrantReadWriteLock锁支持读读并发,但读写、写写都是互斥的

    在我们研究ReentrantLock时,就知道AQS是用voliate int state去表示锁状态,int是4字节,32位的.在ReentrantReadWriteLock中需要用这个state表示读和写锁,就用高16位表示读锁,低16位表示写锁。因为无论读锁还是写锁都用了2字节表示,所以可重入次数为2^16-1

    state

    先看如下计算

    0000000000000000   0000000000000000
    ----------------------------------------------
    写入读锁,即在原来的基础上+65535
    0000000000000001   0000000000000000
    读取读锁数量,直接将1右边移16位,1>>>16
    0000000000000000   0000000000000001
    ----------------------------------------------
    写入写锁,即在原来基础上+1
    0000000000000000   0000000000000001
    读取写锁数量,原来的值与(高16位都为0,低16位都为1的值)做&运算
    原值:0000000000000000    0000000000000001
    &
    辅值:0000000000000000    1111111111111111
    结果:0000000000000000    0000000000000001
    

    先来看一下构造方法

    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    public ReentrantReadWriteLock() {
            this(false);
        }
    
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    

    默认是非公平锁,并且new出了ReadLock和WriteLock,所以我们使用时如下就能获取到读锁和写锁

    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
    

    写锁lock

    调用writeLock.lock()时

    public static class WriteLock implements Lock, java.io.Serializable {
       
            public void lock() {
                sync.acquire(1);
            }
    }
    

    继续调用,走到了AQS,这个很熟悉,在ReentranLock中也看到过

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    这里先执行tryAcquire()==false,那么才会继续走acquireQueued(),tryAcquire是ReentrantReadWriteLock 内部类Sync 中实现方法

    abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    }
    
    1. 先获取当前线程
    2. 获取volatile int state的值,然后函数exclusiveCount()根据state值获取到写锁的数量
    3. 注意此时调用的是写锁的lock,当有锁,即进入条件if (c != 0)
    • 当w==0,即有线程获取了读锁,那么直接返回false,无论当前线程.说明了ReentrantReadWriteLock不支持读升级写冲入
    • 当w!=0,即有线程获取了写锁,当前线程不是持有锁的线程,那么返回false,即写写互斥
    1. 当然加写锁前需要判断是否超过65535重入,如果第3的条件都不成立,即获取锁成功,设置state+1
    2. 当state==0,那么writerShouldBlock()==false(非公平锁),再CAS设置state+1,并设置占有锁线程为当前线程,然后true

    当tryAcquire()返回false时,就将执行acquireQueued()这里不多说,在ReentranLock中已详细说明,下面为概括图

    ypVnUI.png

    读锁lock

    调用readLock.lock()时

    public static class ReadLock implements Lock, java.io.Serializable {
    public void lock() {
                sync.acquireShared(1);
            }
    }
    

    继续调用,走到了AQS的acquireShared(),意为获取共享锁

    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

    这里先调用tryAcquireShared(),如果返回值<0,那么执行doAcquireShared()
    先来看看tryAcquireShared(),tryAcquire是ReentrantReadWriteLock 内部类Sync 中实现方法

    abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    }
    
    1. 获取当前线程
    2. 获取state值
    3. 如果有线程持有写锁,并且当前读线程不是持有写锁的线程,那么返回-1(读写互斥,并支持写降级读重入)
    4. 获取读锁的数量
    5. readerShouldBlock()
    • 如果队列为空或者前面没有写锁在排队时,则返回false
    • 如果头结点不为空,下一个结点也不为空并且是写锁时,返回true,表示要规规矩矩的排队
    1. 当readerShouldBlock()返回false时,继续判断读锁数量是否少于最大重入数65535,再CAS设置读锁+1,设置成功后
    • 当还没设置读锁+1时,读锁为0,那么久不熬时当前线程就是第一个上读锁的线程
    • 当当前线程==持有读锁线程,那么就是锁重入
    • 当持有读锁线程!=当前读线程,这里涉及一个不同线程重入,用HoldCounter类,这个类只有线程id和重入次数两个字段,当线程重入的时候就会初始化这个类并保存在ThreadLocalHoldCounter类中,这个类就是继承ThreadLocl的,用来初始化HoldCounter对象并保存
    1. 读锁如果成功获取,那么返回1
    2. 当readerShouldBlock()返回true时,直接调用fullTryAcquireShared()
    final int fullTryAcquireShared(Thread current) {
                /*
                 * This code is in part redundant with that in
                 * tryAcquireShared but is simpler overall by not
                 * complicating tryAcquireShared with interactions between
                 * retries and lazily reading hold counts.
                 */
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    } else if (readerShouldBlock()) {
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) {
                            // assert firstReaderHoldCount > 0;
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    

    这个方法中代码和tryAcquireShared基本上一致,只是采用了自旋的方式,处理初次加锁中的漏网之鱼

    如果tryAcquireShared返回-1,即加锁失败,那么走AQS中的doAcquireShared()

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    1. 将当前线程封装成share Node放入CLH队列
    2. for无条件循环,判断当前节点的前继节点是否是头节点,如果是再一次尝试获取锁,获取锁成功,那么继续唤醒后面的所有share Node,到最后一个share Node节点(后面并没其他节点),将waitState=-3
    3. 前继节点不是头节点或者获取锁失败,那么就将前继节点waitState=-1,并挂起当前线程

    读锁.unLock()

    public static class ReadLock implements Lock, java.io.Serializable {
            public void unlock() {
                sync.releaseShared(1);
            }
    }
    

    调到AQS

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                }
            }
    

    没什么好说的,就一直将读锁释放,全部释放后,唤醒写锁线程

    写锁.unLock()

    public static class WriteLock implements Lock, java.io.Serializable {
      public void unlock() {
                sync.release(1);
            }
    }
    

    调到AQS

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    }
    

    也没什么好说的,解锁将state变为0,重新唤醒CLH队列中的Node

    相关文章

      网友评论

          本文标题:多线程-AQS-ReentrantReadWriteLock

          本文链接:https://www.haomeiwen.com/subject/nybttltx.html