美文网首页
StampedLock图解之一

StampedLock图解之一

作者: 程序员札记 | 来源:发表于2022-04-05 09:27 被阅读0次

    StampedLock类简介

    StampedLock类,在JDK1.8时引入,是对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。

    首先明确下,该类的设计初衷是作为一个内部工具类,用于辅助开发其它线程安全组件,用得好,该类可以提升系统性能,用不好,容易产生死锁和其它莫名其妙的问题。

    1.1 StampedLock的引入

    先来看下,为什么有了ReentrantReadWriteLock,还要引入StampedLock?

    ReentrantReadWriteLock使得多个读线程同时持有读锁(只要写锁未被占用),而写锁是独占的。

    但是,读写锁如果使用不当,很容易产生“饥饿”问题:

    比如在读线程非常多,写线程很少的情况下,很容易导致写线程“饥饿”,虽然使用“公平”策略可以一定程度上缓解这个问题,但是“公平”策略是以牺牲系统吞吐量为代价的。这里具体解释一下

    1. 如果所有的操作都是读,都是写的 操作,是不会有饥饿问题的, 尽管在写多于读的情况下,写锁获得比较慢,但是不会造成饥饿hung住的情况。
    2. 但是如果有读写嵌套的情况下,是会有线程饥饿的问题的。下面这个是个典型的并发cache 场景
    package com.concurreny.aqs;
    
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class ReentrantReadWriteLockHungryTest {
    
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    
        public void readAndWrite(){
            try {
                readLock.lock();
                System.out.println("线程"+Thread.currentThread().getName()+"进入。。。");
                Thread.sleep(3000);
                writeLock.lock();
                System.out.println("线程"+Thread.currentThread().getName()+"退出。。。");
                writeLock.unlock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                readLock.unlock();
            }
        }
        public static void main(String[] args) {
            final ReentrantReadWriteLockHungryTest wr = new ReentrantReadWriteLockHungryTest();
            Thread t1 = new Thread(new Runnable() {
                public void run() {
                    wr.readAndWrite();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                public void run() {
                    wr.readAndWrite();
                }
            }, "t2");
            Thread t3 = new Thread(new Runnable() {
                public void run() {
                    wr.readAndWrite();
                }
            }, "t3");
            Thread t4 = new Thread(new Runnable() {
                public void run() {
                    wr.readAndWrite();
                }
            }, "t4");
    
            t1.start();
            t2.start();
            //t3.start();
            //t4.start();
        }
    }
    

    1.2 StampedLock的特点

    StampedLock的主要特点概括一下,有以下几点:

    • 所有获取锁的方法,都返回一个邮戳(Stamp),Stamp为0表示获取失败,其余都表示成功;
    • 所有释放锁的方法,都需要一个邮戳(Stamp),这个Stamp必须是和成功获取锁时得到的Stamp一致;
    • StampedLock是不可重入的;(如果一个线程已经持有了写锁,同线程再去获取写锁的话就会造成死锁)

    StampedLock有三种访问模式:

    • ①Reading(读模式):功能和ReentrantReadWriteLock的读锁类似

    • ②Writing(写模式):功能和ReentrantReadWriteLock的写锁类似

    • ③Optimistic reading(乐观读模式):这是一种优化的读模式。
      StampedLock提供了三种模式的控制:

    • 独占写模式。writeLock方法可能会在获取共享状态时阻塞,如果成功获取锁,返回一个stamp,它可以作为参数被用在unlockWrite方法中以释放写锁。tryWriteLock的超时与非超时版本都被提供使用。当写锁被获取,那么没有读锁能够被获取并且所有的乐观读锁验证都会失败。

    • 悲观读模式。readLock方法可能会在获取共享状态时阻塞,如果成功获取锁,返回一个stamp,它可以作为参数被用在unlockRead方法中以释放读锁。tryReadLock的超时与非超时版本都被提供使用。

    • 乐观读模式。tryOptimisticRead方法只有当写锁没有被获取时会返回一个非0的stamp。在获取这个stamp后直到调用validate方法这段时间,如果写锁没有被获取,那么validate方法将会返回true。这个模式可以被认为是读锁的一个弱化版本,因为它的状态可能随时被写锁破坏。这个乐观模式的主要是为一些很短的只读代码块的使用设计,它可以降低竞争并且提高吞吐量。但是,它的使用本质上是很脆弱的。乐观读的代码区域应当只读取共享数据并将它们储存在局部变量中以待后来使用,当然在使用前要先验证这些数据是否过期,这可以使用前面提到的validate方法。在乐观读模式下的数据读取可能是非常不一致的过程,因此只有当你对数据的表示很熟悉并且重复调用validate方法来检查数据的一致性时使用此模式。例如,当先读取一个对象或者数组引用,然后访问它的字段、元素或者方法之一时上面的步骤都是需要的。

    这个类还提供了在三种模式之间转换的辅助方法。例如,tryConvertToWriteLock方法尝试"提升"一个模式,如果已经获取了读锁并且此时没有其他线程获取读锁,那么这个方法返回一个合法的写stamp。这些方法被设计来帮助减少以“重试为主”设计时发生的代码代码膨胀。
    StampedLock支持读锁和写锁的相互转换

    • 我们知道RRW中,当线程获取到写锁后,可以降级为读锁,但是读锁是不能直接升级为写锁的。
    • StampedLock提供了读锁和写锁相互转换的功能,使得该类支持更多的应用场景。
    • 无论写锁还是读锁,都不支持Conditon等待

    我们知道,在ReentrantReadWriteLock中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会阻塞。
    但是,在Optimistic reading中,即使读线程获取到了读锁,写线程尝试获取写锁也不会阻塞,这相当于对读模式的优化,但是可能会导致数据不一致的问题。所以,当使用Optimistic reading获取到读锁时,必须对获取结果进行校验。

    package com.conrrentcy.juc;
    
    import java.util.concurrent.locks.StampedLock;
    
    public class StampedLockDemo {
    
    }
    
    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();
    
        void move(double deltaX, double deltaY) {
            long stamp = sl.writeLock();    //涉及对共享资源的修改,使用写锁-独占操作
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }
    
        /**
         * 使用乐观读锁访问共享资源
         * 注意:乐观读锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,
         * 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
         *
         * @return
         */
        double distanceFromOrigin() {
            long stamp = sl.tryOptimisticRead();    // 使用乐观读锁
            double currentX = x, currentY = y;      // 拷贝共享资源到本地方法栈中
            if (!sl.validate(stamp)) {              // 如果有写锁被占用,可能造成数据不一致,所以要切换到普通读锁模式
                stamp = sl.readLock();             
                try {
                    currentX = x;
                    currentY = y;
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }
    
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) {
                    long ws = sl.tryConvertToWriteLock(stamp);  //读锁转换为写锁
                    if (ws != 0L) {
                        stamp = ws;
                        x = newX;
                        y = newY;
                        break;
                    } else {
                        sl.unlockRead(stamp);
                        stamp = sl.writeLock();
                    }
                }
            } finally {
                sl.unlock(stamp);
            }
        }
    }
    

    同步节点

    StampedLock使用 long 作为同步状态的类型,它使用一个小的有限数作为读锁被获取的2进制位数(目前为7),所以当reader的数量到达上限时,使用一个额外的溢出字来表示溢出。我们通过将最大的reader数量(RBITS)视作一个自旋锁来保护同步状态的溢出更新。

    在StampedLock中使用的同步节点与AQS的同步节点有一点不同,下面先看它的常量代表的意义。

    line: 352

        /** 处理器的数量,控制自旋次数 */
        private static final int NCPU = Runtime.getRuntime().availableProcessors();
    
        /** 被增加到同步队列前最大的重试次数,至少为1 */
        private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1;
    
        /** 在头节点处被阻塞前的最大重试次数 */
        private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 1;
    
        /** 在再次被阻塞前的最大重试次数 */
        private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 1;
    
        /** The period for yielding when waiting for overflow spinlock */
        private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
    
        /** 读锁被获取的次数的二进制位数 */
        private static final int LG_READERS = 7;
    
        // Values for lock state and stamp operations
        private static final long RUNIT = 1L;                  // 类似读写锁的RUNIT,意思是每次获取读锁时
                                                               // 同步状态应当增加1
        private static final long WBIT  = 1L << LG_READERS;    // 写状态        10000000
        private static final long RBITS = WBIT - 1L;           // 溢出保护      01111111
        private static final long RFULL = RBITS - 1L;          // 最大reader    01111110
        private static final long ABITS = RBITS | WBIT;        // 掩码          11111111
        private static final long SBITS = ~RBITS;              // 掩码     24(1)10000000
    
        /*
         * 3种模式可以通过检查区分 (m = stamp & ABITS):
         * 写模式: m == WBIT
         * 乐观读模式: m == 0L (即使读锁已经被持有)
         * 悲观读模式: m > 0L && m <= RFULL (同步状态的拷贝,,但是stamp中的
         * read hold count除了用来决定是哪个模式以外不会被使用)
         *
         * This differs slightly from the encoding of state:
         * (state & ABITS) == 0L 表示锁没有被获取
         * (state & ABITS) == RBITS 这是一个特殊值,表示操作读者bit位的自旋锁溢出
         */
    
        /** 锁状态的初始值 */
        private static final long ORIGIN = WBIT << 1;          // 1 00000000
    
        // Special value from cancelled acquire methods so caller can throw IE
        private static final long INTERRUPTED = 1L;
    
        // 节点状态值; order matters
        private static final int WAITING   = -1;
        private static final int CANCELLED =  1;
    
        // 节点模式 (使用int而不是boolean以允许运算)
        private static final int RMODE = 0;
        private static final int WMODE = 1;
    

    其中ABITS和SBITS是作为掩码使用的,来快速检查当前锁的状态,在后面读写锁的获取中可以看到它们的使用。使用ORIGIN作为初始值也是与此相关,我们在后面讨论。而读状态正常最多只可以被获取126(RFULL)次,如果超出这个上限,那么其他读线程获取锁时需要在readreaderOverflow记录。因为readreaderOverflow不是个原子变量,所以为了保证它的同步性,需要进行同步处理。


    image.png

    StampedLock中state是long类型,占64位,它被划为了三个部分来使用。低7位作为读锁标志位,可以由多个线程共享,每有一个线程申请读锁成功,低7位就加1。第8位是写锁位,由线程独占。
    其余位是stamp位,用来记录写锁状态的变化(版本号),每使用一次写锁,stamp位就会加1。
    同时如果读锁数量超过了126之后,超出的次数使用readerOverflow来进行计数。

    了解了常量值的含义后,开始对同步节点的分析:

    line: 406

    static final class WNode {
        volatile WNode prev;      // 前驱节点
        volatile WNode next;      // 后继节点
        volatile WNode cowait;    // 读线程链表
        volatile Thread thread;   // non-null while possibly parked
        volatile int status;      // 0, WAITING, or CANCELLED
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }
    
    /** 队列头节点 */
    private transient volatile WNode whead;
    /** 队列尾节点 */
    private transient volatile WNode wtail;
    

    StampedLock中的同步节点和AQS的几乎一样,只多加了一个cowait字段,同时状态略有不同,还多了个判定是读还是写的mode字段。关于状态只有WAITING和CANCELLED两种,阅读过AQS相信对此不会有疑惑,而cowait的的出现是对AQS的优化。在StampedLock中,读节点不像AQS那样每个读线程都会构造一个自己的节点并加入到同步队列中,而是将许多连续的读节点挂载在一个读节点上,此时同步队列中就不会出现多个连续的读节点,当此读节点获取到锁时,会唤醒在其上挂载的所有读线程,此时其他需要增加到同步队列中的线程无论读写都会帮助头节点唤醒,如此就大大加快了读线程的唤醒速度。

    线程w1获取了写锁,一直未释放。此时有4个线程分别获取读锁(获取顺序是R0-->R1-->R2-->R3),又有线程W2获取写锁,最后还有R4,R5,R6三个线程获取读锁,那么此时队列的排队情况如下

    image.png

    示例分析

    假设现在有三个线程:ThreadA、ThreadB、ThreadC、ThreadD。操作如下:

    • ThreadA调用writeLock, 获取写锁
    • ThreadB调用readLock, 获取读锁
    • ThreadC调用readLock, 获取读锁
    • ThreadD调用writeLock, 获取写锁
    • ThreadE调用readLock, 获取读锁

    1. StampedLock对象的创建

    StampedLock的构造器很简单,构造时设置下同步状态值:

        public StampedLock() {
            state = ORIGIN;
        }
    

    另外,StamedLock提供了三类视图:

        transient ReadLockView readLockView;
        transient WriteLockView writeLockView;
        transient ReadWriteLockView readWriteLockView;
    

    这些视图其实是对StamedLock方法的封装,便于习惯了ReentrantReadWriteLock的用户使用:
    例如,ReadLockView其实相当于ReentrantReadWriteLock.readLock()返回的读锁;

        final class ReadLockView implements Lock {
            public void lock() { readLock(); }
            public void lockInterruptibly() throws InterruptedException {
                readLockInterruptibly();
            }
            public boolean tryLock() { return tryReadLock() != 0L; }
            public boolean tryLock(long time, TimeUnit unit)
                throws InterruptedException {
                return tryReadLock(time, unit) != 0L;
            }
            public void unlock() { unstampedUnlockRead(); }
            public Condition newCondition() {
                throw new UnsupportedOperationException();
            }
    

    2. ThreadA调用writeLock获取写锁

    来看下writeLock方法:

    StampedLock中大量运用了位运算,这里(s = state) & ABITS == 0L 表示读锁和写锁都未被使用,这里写锁可以立即获取成功,然后CAS操作更新同步状态值State。

    操作完成后,等待队列在初始化的结构如下:


    image.png

    此时为一个空的队列。

    注意:StampedLock中,等待队列的结点要比AQS中简单些,仅仅三种状态。

    • 0:初始状态
    • -1:等待中
    • 1:取消

    另外,结点的定义中有个cowait字段,该字段指向一个栈,用于保存读线程,这个前面已经讲到。

        static final class WNode {
            volatile WNode prev;
            volatile WNode next;
            volatile WNode cowait;    // list of linked readers
            volatile Thread thread;   // non-null while possibly parked
            volatile int status;      // 0, WAITING, or CANCELLED
            final int mode;           // RMODE or WMODE
            WNode(int m, WNode p) { mode = m; prev = p; }
        }
    
        /** Head of CLH queue */
        private transient volatile WNode whead;
        /** Tail (last) of CLH queue */
        private transient volatile WNode wtail;
    

    3. ThreadB调用readLock获取读锁

    来看下readLock方法:
    由于ThreadA此时持有写锁,所以ThreadB获取读锁失败,将调用acquireRead方法,加入等待队列

    /**
     * 尝试自旋的获取读锁, 获取不到则加入等待队列, 并阻塞线程
     *
     * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
     * @param deadline      如果非0, 则表示限时获取
     * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
     */
    private long acquireRead(boolean interruptible, long deadline) {
        WNode node = null, p;   // node指向入队结点, p指向入队前的队尾结点
     
        /**
         * 自旋入队操作
         * 如果写锁未被占用, 则立即尝试获取读锁, 获取成功则返回.
         * 如果写锁被占用, 则将当前读线程包装成结点, 并插入等待队列(如果队尾是写结点,直接链接到队尾;否则,链接到队尾读结点的栈中)
         */
        for (int spins = -1; ; ) {
            WNode h;
            if ((h = whead) == (p = wtail)) {   // 如果队列为空或只有头结点, 则会立即尝试获取读锁
                for (long m, s, ns; ; ) {
                    if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))        //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
                        return ns;          // 获取成功后, 直接返回
                    else if (m >= WBIT) {   // 写锁被占用,以随机方式探测是否要退出自旋
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins;
                        } else {
                            if (spins == 0) {
                                WNode nh = whead, np = wtail;
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break;
                            }
                            spins = SPINS;
                        }
                    }
                }
            }
            if (p == null) {                            // p == null表示队列为空, 则初始化队列(构造头结点)
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            } else if (node == null) {                  // 将当前线程包装成读结点
                node = new WNode(RMODE, p);
            } else if (h == p || p.mode != RMODE) {     // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋
                if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    break;
                }
            }
            // 队列不为空, 且队尾是读结点, 则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈, p是栈顶指针 )
            else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) {    // CAS操作队尾结点p的cowait字段,实际上就是头插法插入结点
                node.cowait = null;
            } else {
                for (; ; ) {
                    WNode pp, c;
                    Thread w;
                    // 尝试唤醒头结点的cowait中的第一个元素, 假如是读锁会通过循环释放cowait链
                    if ((h = whead) != null && (c = h.cowait) != null &&
                        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null) // help release
                        U.unpark(w);
                    if (h == (pp = p.prev) || h == p || pp == null) {
                        long m, s, ns;
                        do {
                            if ((m = (s = state) & ABITS) < RFULL ?
                                U.compareAndSwapLong(this, STATE, s,
                                    ns = s + RUNIT) :
                                (m < WBIT &&
                                    (ns = tryIncReaderOverflow(s)) != 0L))
                                return ns;
                        } while (m < WBIT);
                    }
                    if (whead == h && p.prev == pp) {
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                            node = null; // throw away
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, p, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
                            // 写锁被占用, 且当前结点不是队首结点, 则阻塞当前线程
                            U.park(false, time);
                        }
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);
                    }
                }
            }
        }
     
        for (int spins = -1; ; ) {
            WNode h, np, pp;
            int ps;
            if ((h = whead) == p) {     // 如果当前线程是队首结点, 则尝试获取读锁
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins; ; ) { // spin at head
                    long m, s, ns;
                    if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {      //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
                        // 获取读锁成功, 释放cowait链中的所有读结点
                        WNode c;
                        Thread w;
     
                        // 释放头结点, 当前队首结点成为新的头结点
                        whead = node;
                        node.prev = null;
     
                        // 从栈顶开始(node.cowait指向的结点), 依次唤醒所有读结点, 最终node.cowait==null, node成为新的头结点
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    } else if (m >= WBIT &&
                        LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        break;
                }
            } else if (h != null) {     // 如果头结点存在cowait链, 则唤醒链中所有读线程
                WNode c;
                Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                } else if ((ps = p.status) == 0)        // 将前驱结点的等待状态置为WAITING, 表示之后将唤醒当前结点
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                } else {        // 阻塞当前读线程
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)   //限时等待超时, 取消等待
                        return cancelWaiter(node, node, false);
     
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {
                        // 如果前驱的等待状态为WAITING, 且写锁被占用, 则阻塞当前调用线程
                        U.park(false, time);
                    }
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }
    
    

    我们来分析下这个方法。
    该方法会首先自旋的尝试获取读锁,获取成功后,就直接返回;否则,会将当前线程包装成一个读结点,插入到等待队列。
    由于,目前等待队列还是空,所以ThreadB会初始化队列,然后将自身包装成一个读结点,插入队尾,然后在下面 break 这个地方跳出自旋:

            if (p == null) {                            // p == null表示队列为空, 则初始化队列(构造头结点)
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            } else if (node == null) {                  // 将当前线程包装成读结点
                node = new WNode(RMODE, p);
            } else if (h == p || p.mode != RMODE) {     // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋
                if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    break;
                }
            }
    
    

    此时,等待队列的结构如下:


    image.png

    跳出自旋后,ThreadB会继续向下执行,进入下一个自旋,在下一个自旋中,依然会再次尝试获取读锁,如果这次再获取不到,就会将前驱的等待状态置为WAITING, 表示我(当前线程)要去睡了(阻塞),到时记得叫醒我:


    image.png
    此时,等待队列的结构如下:
    image.png

    最终, ThreadB进入阻塞状态:


    image.png

    4. ThreadC调用readLock获取读锁

    这个过程和ThreadB获取读锁一样,区别在于ThreadC被包装成结点加入等待队列后,是链接到ThreadB结点的栈指针中的。调用完下面这段代码后,ThreadC会链接到以Thread B为栈顶指针的栈中:


    image.png

    注意:读结点的cowait字段其实构成了一个栈,入栈的过程其实是个“头插法”插入单链表的过程。比如,再来个ThreadX读结点,则cowait链表结构为:ThreadB - > ThreadX -> ThreadC。最终唤醒读结点时,将从栈顶开始。
    然后会在下一次自旋中,阻塞当前读线程:

    image.png

    最终,等待队列的结构如下:


    image.png

    可以看到,此时ThreadC结点并没有把它的前驱的等待状态置为-1,因为ThreadC是链接到栈中的,当写锁释放的时候,会从栈底元素开始,唤醒栈中所有读结点。

    5. ThreadD调用writeLock获取写锁

    ThreadD调用writeLock方法获取写锁失败后(ThreadA依然占用着写锁),会调用acquireWrite方法,该方法整体逻辑和acquireRead差不多,首先自旋的尝试获取写锁,获取成功后,就直接返回;否则,会将当前线程包装成一个写结点,插入到等待队列。


    image.png
    acquireWrite源码:
    /**
     * 尝试自旋的获取写锁, 获取不到则阻塞线程
     *
     * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
     * @param deadline      如果非0, 则表示限时获取
     * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
     */
    private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
    
        /**
         * 自旋入队操作
         * 如果没有任何锁被占用, 则立即尝试获取写锁, 获取成功则返回.
         * 如果存在锁被使用, 则将当前线程包装成独占结点, 并插入等待队列尾部
         */
        for (int spins = -1; ; ) {
            long m, s, ns;
            if ((m = (s = state) & ABITS) == 0L) {      // 没有任何锁被占用
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))    // 尝试立即获取写锁
                    return ns;                                                 // 获取成功直接返回
            } else if (spins < 0)
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            } else if ((p = wtail) == null) {       // 队列为空, 则初始化队列, 构造队列的头结点
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            } else if (node == null)               // 将当前线程包装成写结点
                node = new WNode(WMODE, p);
            else if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {    // 链接结点至队尾
                p.next = node;
                break;
            }
        }
    
        for (int spins = -1; ; ) {
            WNode h, np, pp;
            int ps;
            if ((h = whead) == p) {     // 如果当前结点是队首结点, 则立即尝试获取写锁
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins; ; ) { // spin at head
                    long s, ns;
                    if (((s = state) & ABITS) == 0L) {      // 写锁未被占用
                        if (U.compareAndSwapLong(this, STATE, s,
                            ns = s + WBIT)) {               // CAS修改State: 占用写锁
                            // 将队首结点从队列移除
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    } else if (LockSupport.nextSecondarySeed() >= 0 &&
                        --k <= 0)
                        break;
                }
            } else if (h != null) {  // 唤醒头结点的栈中的所有读线程
                WNode c;
                Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                } else if ((ps = p.status) == 0)        // 将当前结点的前驱置为WAITING, 表示当前结点会进入阻塞, 前驱将来需要唤醒我
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                } else {        // 阻塞当前调用线程
                    long time;  // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
                        U.park(false, time);    // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }
    
    
    

    acquireWrite中的下面这个自旋操作,用于将线程包装成写结点,插入队尾:


    image.png

    插入完成后,队列结构如下:


    image.png
    然后,进入下一个自旋,并在下一个自旋中阻塞ThreadD,最终队列结构如下:
    image.png

    同样,由于写锁被ThreadA占用着,所以最终会调用acquireRead方法,在该方法的第一个自旋中,会将ThreadE加入等待队列:


    image.png
    注意,由于队尾结点是写结点,所以当前读结点会直接链接到队尾;如果队尾是读结点,则会链接到队尾读结点的cowait链中。
    然后进入第二个自旋,阻塞ThreadE,最终队列结构如下:
    image.png

    7. ThreadA调用unlockWrite释放写锁

    通过CAS操作,修改State成功后,会调用release方法唤醒等待队列的队首结点:

    
        public void unlockWrite(long stamp) {
            WNode h;
            if (state != stamp || (stamp & WBIT) == 0L)
                throw new IllegalMonitorStateException();
            state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
            if ((h = whead) != null && h.status != 0)
                release(h);
        }
    

    release方法非常简单,先将头结点的等待状态置为0,表示即将唤醒后继结点,然后立即唤醒队首结点:

        private void release(WNode h) {
            if (h != null) {
                WNode q; Thread w;
                U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
                if ((q = h.next) == null || q.status == CANCELLED) {
                    for (WNode t = wtail; t != null && t != h; t = t.prev)
                        if (t.status <= 0)
                            q = t;
                }
                if (q != null && (w = q.thread) != null)
                    U.unpark(w);
            }
        }
    
    

    此时,等待队列的结构如下:


    image.png

    8. ThreadB被唤醒后继续向下执行

    ThreadB被唤醒后,会从原阻塞处继续向下执行,然后开始下一次自旋:


    image.png

    第二次自旋时,ThreadB发现写锁未被占用,则成功获取到读锁,然后从栈顶(ThreadB的cowait指针指向的结点)开始唤醒栈中所有线程,
    最后返回:

              for (int k = spins; ; ) { // spin at head
                    long m, s, ns;
                    if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {      //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
                        // 获取读锁成功, 释放cowait链中的所有读结点
                        WNode c;
                        Thread w;
     
                        // 释放头结点, 当前队首结点成为新的头结点
                        whead = node;
                        node.prev = null;
     
                        // 从栈顶开始(node.cowait指向的结点), 依次唤醒所有读结点, 最终node.cowait==null, node成为新的头结点
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    } else if (m >= WBIT &&
                        LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        break;
                }
    
    

    最终,等待队列的结构如下:


    image.png

    9. ThreadC被唤醒后继续向下执行

    ThreadC被唤醒后,继续执行,并进入下一次自旋,下一次自旋时,会成功获取到读锁。


    image.png
    1. ThreadB和ThreadC释放读锁
      ThreadB和ThreadC调用unlockRead方法释放读锁,CAS操作State将读锁数量减1:
      注意,当读锁的数量变为0时才会调用release方法,唤醒队首结点:
        public void unlockRead(long stamp) {
            long s, m; WNode h;
            for (;;) {
                if (((s = state) & SBITS) != (stamp & SBITS) ||
                    (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                    throw new IllegalMonitorStateException();
                if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                        if (m == RUNIT && (h = whead) != null && h.status != 0)
                            release(h);
                        break;
                    }
                }
                else if (tryDecReaderOverflow(s) != 0L)
                    break;
            }
        }
    

    注意,当读锁的数量变为0时才会调用release方法,唤醒队首结点

        private void release(WNode h) {
            if (h != null) {
                WNode q; Thread w;
                U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
                if ((q = h.next) == null || q.status == CANCELLED) {
                    for (WNode t = wtail; t != null && t != h; t = t.prev)
                        if (t.status <= 0)
                            q = t;
                }
                if (q != null && (w = q.thread) != null)
                    U.unpark(w);
            }
        }
    

    队首结点(ThreadD写结点被唤醒),最终等待队列的结构如下:


    image.png

    11. ThreadD被唤醒后继续向下执行

    ThreadD会从原阻塞处继续向下执行,并在下一次自旋中获取到写锁,然后返回:


    image.png

    最终,等待队列的结构如下:


    image.png

    12. ThreadD调用unlockWrite释放写锁

    ThreadD释放写锁的过程和步骤7完全相同,会调用unlockWrite唤醒队首结点(ThreadE)。


    image.png

    ThreadE被唤醒后会从原阻塞处继续向下执行,但由于ThreadE是个读结点,所以同时会唤醒cowait栈中的所有读结点,过程和步骤8完全一样。最终,等待队列的结构如下:


    image.png

    总结

    StampedLock的等待队列与RRW的CLH队列相比,有以下特点:

    当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的cowait链中,cowait链本质是一个栈;

    当入队一个线程时,如果队尾是写结点,则直接链接到队尾;

    唤醒线程的规则和AQS类似,都是首先唤醒队首结点。区别是StampedLock中,当唤醒的结点是读结点时,会唤醒该读结点的cowait链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。

    另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。

    相关文章

      网友评论

          本文标题:StampedLock图解之一

          本文链接:https://www.haomeiwen.com/subject/pycfsrtx.html