美文网首页
java多线程-7-ReentrantReadWriteLock

java多线程-7-ReentrantReadWriteLock

作者: 浪迹天涯的咸鱼 | 来源:发表于2019-09-30 09:07 被阅读0次

    概述

    • 成功获取读锁(含读锁重入),会有自旋锁(CLH,无饥饿)特性,传递唤醒队列线程直到写锁或队尾
    • 释放锁时比较严格,只有已无读锁和写锁被持有,才会开启自旋唤醒
    • 记得测试获取读锁,或者读锁重入时,是否发生自旋???
    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable
    
    public interface ReadWriteLock {
        Lock readLock();
    
        Lock writeLock();
    }
    

    公平与非公平

    非公平

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge 闯入 // 写锁总是可以抢,避免饥饿吧
        }
        final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); }
    }
    
    // 第1个节点有效 && 等写锁
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
    }
    

    公平

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
        final boolean readerShouldBlock() { return hasQueuedPredecessors(); }
    }
    
    // 首尾不等 && 第1节点非自己,需要block
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    构造函数

    public ReentrantReadWriteLock() {
        this(false);
    }
    
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    
    public static class ReadLock implements Lock, java.io.Serializable
    public static class WriteLock implements Lock, java.io.Serializable
    
    • ReadLock和WriteLock内部也持有一个sync对象,等于ReentrantReadWriteLock的sync

    state属性

    static final int SHARED_SHIFT   = 16;                      // 读锁占用位数
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);     // 读锁线程+1时的递增单位
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; // 读锁最大线程数
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 写锁重入掩码,16个1
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } // 读锁线程
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 写锁重入数
    
    • 用1个int数表示锁占用情况,即AQS的state属性
    • 高16位表示占有读锁的线程数,低16位表示写锁重入数
    • 举例:0000 0000 0000 1101 0000 0000 0000 0001 -> 13个读锁线程,写锁重入1次;只可能是同1个线程,读重入13次,写1次

    线程持锁信息

    private transient ThreadLocalHoldCounter readHolds;
    private transient HoldCounter cachedHoldCounter;
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;
    
    if (r == 0) { // 读锁线程数sharedCount(state)
        firstReader = current; // 第1读锁线程
        firstReaderHoldCount = 1; // 第1读锁线程重入数
    } else if (firstReader == current) {
        firstReaderHoldCount++;
    } else {
        HoldCounter rh = cachedHoldCounter; // 最新读锁线程持锁信息(count, tid)
        if (rh == null || rh.tid != getThreadId(current))
            cachedHoldCounter = rh = readHolds.get(); // cache不是当前线程,用当前线程覆盖cache
        else if (rh.count == 0)
            readHolds.set(rh); // cache是当前线程,上次释锁后还未有读线程占有过
        rh.count++;
    }
    
    • 上述4个变量,完成一件事:将线程读锁信息放入ThreadLocal,以便线程获取持锁信息
    • firstReader,firstReaderHoldCount,cachedHoldCounter都为readHolds服务,用于减少readHolds.get调用次数
    • firstReader与firstReadHoldCount保存第一个读锁线程信息,readHolds中不保存
    • cachedHoldCounter缓存最后一个读锁线程信息

    ReadLock

    lock()

    public void lock() {
        sync.acquireShared(1);
    }
    
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
            return -1; // 其它线程持有写锁,失败
        // 能到这,要么写锁未被持;要么当前线程持写锁
        int r = sharedCount(c);
        if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
            // 能到这,说明reader不需block && 读锁线程未超 && CAS更新读锁线程数成功
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
    • 看下readerShouldBlock
      • 同步队列的第1个真实节点有效,并且等独占锁,那么should block
    // NonfairSync
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
    
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
    }
    
    • 进入fullTryAcquireShared情形
      • reader should block(可能写锁未被占,也可能当前线程持写锁)
      • r < MAX_COUNT不满足,小概率
      • compareAndSetState(c, c + SHARED_UNIT),小概率
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1; // 写锁被占且非当前线程,超小概率
                // else we hold the exclusive lock; blocking here would cause deadlock.
            } else if (readerShouldBlock()) {
                // 进入这,说明写锁未被持 && reader should block
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) { // 明显重入
                    // assert firstReaderHoldCount > 0;
                } else { // 寻找非明显重入
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove(); // remove后回归初始值
                        }
                    }
                    if (rh.count == 0)
                        return -1; // 非重入,失败
                }
            }
            // 能到这,要么当前线程持写锁;要么当前线程读锁重入
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded"); // 上面已判,所以超小概率
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh); // 为0为什么要刷入,默认值不是一样?
                    rh.count++;
                    cachedHoldCounter = rh;
                }
                return 1;
            }
        }
    }
    
    • try失败了,就进入doAcquireShared
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { // 只有同步队列第1节点才可尝试持锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 进行自我阻塞
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared(); // 进入这里,表明下一节点非写锁
        }
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    // 从队尾开始,找到最靠前的有效节点,唤醒线程;被唤醒线程会调用setHeadAndPropagate传递下去
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    • 取读锁成功(propagate一定大于0),则在CLH队列(CLH锁即Craig, Landin, Hagersten locks,是自旋锁,确保无饥饿)中传播唤醒
    • head若为Node.SIGNAL,将waitStatus设为0,设置成功,唤醒线程
      • 为什么会设置不成功,可能并发期间该节点已被干掉
    • head状态为0,表明无需往下唤醒,设为PROPAGATE后退出
      • 正常情况下,被唤醒的节点刚变为head时,应该都是-1的;不是很清楚这个0是什么情况下产生

    unlock

    public void unlock() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) { // 当前线程为第1线程,改first信息
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else { // 非第1线程,若是cache改cache,不是cache改ThreadLocal
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove(); // unlock后不再持锁,去除,help GC
                if (count <= 0)
                    throw unmatchedUnlockException(); // 无锁可unlock,异常
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0; // 当读写锁均为空时,才会开启自旋
        }
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    WriteLock

    lock()

    public void lock() {
        sync.acquire(1);
    }
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires); // 写锁重入,所以不需CAS
            return true;
        }
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    

    unlock()

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    相关文章

      网友评论

          本文标题:java多线程-7-ReentrantReadWriteLock

          本文链接:https://www.haomeiwen.com/subject/iarcpctx.html