美文网首页
juc3-locks-ReentrantReadWriteLoc

juc3-locks-ReentrantReadWriteLoc

作者: modou1618 | 来源:发表于2019-02-03 07:39 被阅读0次
    • 内部实现读锁,即可重入共享锁
      private final ReentrantReadWriteLock.ReadLock readerLock;
    • 内部实现写锁,即可重入独占锁
      private final ReentrantReadWriteLock.WriteLock writerLock;
    • final Sync sync;继承AbstractQueuedSynchronizer,支持锁功能的类。包含公平锁子类FairSync和非公平锁子类NonfairSync

    一 读锁

    • 加锁,释放锁等接口都依赖于Sync的对应接口实现
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;
        private final Sync sync;
    
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    
        public void lock() {
            sync.acquireShared(1);
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public boolean tryLock() {
            return sync.tryReadLock();
        }
    
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        public void unlock() {
            sync.releaseShared(1);
        }
    
        public Condition newCondition() {//共享锁不支持条件对象
            throw new UnsupportedOperationException();
        }
    
        public String toString() {
            int r = sync.getReadLockCount();
            return super.toString() +
                    "[Read locks = " + r + "]";
        }
    }
    

    二 写锁

    • 加锁,释放锁等接口都依赖于sync类的接口实现
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;
    
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    
        public void lock() {
            sync.acquire(1);
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }
    
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    
        public void unlock() {
            sync.release(1);
        }
    
        public Condition newCondition() {
            return sync.newCondition();
        }
    
        public String toString() {
            Thread o = sync.getOwner();
            return super.toString() + ((o == null) ?
                    "[Unlocked]" :
                    "[Locked by thread " + o.getName() + "]");
        }
    
        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }
    
        public int getHoldCount() {
            return sync.getWriteHoldCount();
        }
    }
    

    三 Sync

    • 锁计数,低16位表示独占写锁计数,高16位表示共享读锁计数
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    //读锁占用数量
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //写锁占用数量
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    

    3.1 读锁

    • 尝试获取读锁,获取失败返回false
    final boolean tryReadLock() {
        Thread current = Thread.currentThread();
        for (;;) {
            int c = getState();
            //非本线程已经获取了写独占锁,则不可再获取读锁
            if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                return false;
    //线程获取写锁后还可以继续获取读锁,释放写锁后则降级为读锁。
    //反之先获取读锁,后获取写锁则不行。
            int r = sharedCount(c);
            if (r == MAX_COUNT)//读共享锁已达上限,不可再获取
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
               //更新读共享锁计数+1
                if (r == 0) {//更新首个读锁获取线程的信息,及获取读锁计数
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                   //缓存读锁获取统计信息
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
    //从threadlocal变量中获取当前线程的读锁统计数据
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;//计数+1
                }
                return true;
            }
        }
    }
    
    • 父类tryAcquireShared接口实现,获取锁的接口会调用
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
    //非公平锁,或公平锁当前线程优先级最高,则先尝试获取锁
        if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
    //获取成功,更新统计数据
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
    //获取失败,等待队列节点处理
        return fullTryAcquireShared(current);
    }
    
    /**
     * 当前线程获取了写锁或读锁,则可以继续获取读锁
     * 其他线程获取了写锁,则当前线程读锁放入等待锁队列中
     * 全都是读锁,若有等待写锁的线程节点,则只有已获取过读锁的线程可继续获取读锁,没有获取过读锁的线程则放入等待锁队列中
     * 无等待写锁的线程节点,则所有线程都可获取读锁
     */
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    //非当前线程获取的写锁,返回失败,放入等待锁队列中
                    return -1;
               //当前线程获取的写锁,则可以获取读锁。
            } else if (readerShouldBlock()) {
                //有优先级更高的等待获取锁的节点,如等待写锁的节点
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    //首个获取读锁线程,则可以继续获取读锁
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        //当前线程未获取过读锁,则放入等待锁队列。
                        return -1;
                    //当前线程有读锁,则增加读锁统计继续获取读锁
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
    //获取读锁,增加统计计数
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    
    • 释放读锁
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
    //修改锁使用计数
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))//更改锁状态
                // 若锁已完全释放,返回true,唤醒等待锁队列后续节点尝试获取锁
                return nextc == 0;
        }
    }
    

    3.2 写锁

    • 尝试获取写锁
    final boolean tryWriteLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c != 0) {//有获取读锁或写锁
            int w = exclusiveCount(c);
            //读锁不可升级为写锁或其他线程获取了写锁
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
        }
    //更新写锁状态
        if (!compareAndSetState(c, c + 1))
            return false;
    //设置锁独占线程
        setExclusiveOwnerThread(current);
        return true;
    }
    
    • 父类的tryAcquire接口实现
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
    //若写锁获取需要排队,则放入等待锁队列
    //否则尝试获取锁
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    
    • 释放锁
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())//只能释放本线程占用的锁
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)//锁占用全释放,则清空锁占用线程信息
            setExclusiveOwnerThread(null);
        setState(nextc);//更新锁状态
    //若此时线程还有读锁,则写锁降级为读锁
        return free;
    }
    

    四 总结

    • 线程a获取写锁后,线程a还可以继续获取读锁和写锁,其他线程不可获取锁
    • 线程a获取写锁后再获取读锁,释放写锁后则降级为读锁
    • 线程a获取读锁后,所有线程还可以获取读锁,不可获取写锁。

    相关文章

      网友评论

          本文标题:juc3-locks-ReentrantReadWriteLoc

          本文链接:https://www.haomeiwen.com/subject/jgussqtx.html