美文网首页程序员
JUC源码分析-JUC锁(六):StampedLock

JUC源码分析-JUC锁(六):StampedLock

作者: 泰迪的bagwell | 来源:发表于2017-12-30 13:00 被阅读0次

    1. 概述

    StampedLock是JDK 8新增的读写锁,与我们上篇所讲的Phaser一样,它也不是由AQS实现的。StampedLock是一个基于能力(capability-based)的同步锁,提供了三种模式来控制 read/write 的获取,并且内部实现了自己的同步等待队列。本篇我们来详细分析在摒弃了AQS之后的StampedLock的实现。

    1. StampedLock的状态由一个版本和模式构成。锁获取方法返回一个 long型的值-stamp,用来表示并控制对锁状态的访问;0表示锁授权失败。锁释放和转换方法需要用这个stamp作为参数,如果它与锁状态不匹配操作就会失败。StampedLock提供了三种锁模式来控制读写锁的获取:

      • 写锁:使用writeLock方法获取,当锁不可用时会阻塞,获取成功后返回一个与这个写锁对应的stamp,在unlockWrite方法中,需要通过这个stamp来释放与之对应的锁。在tryWriteLock同样也会提供这个stamp。当在write模式中获取到写锁时,读锁不能被获取,并且所有的乐观读锁验证(validate方法)都会失败。
      • 读锁:使用readLock方法获取,当超出可用资源时(类似AQS的state设计)会阻塞。同样的,在获取锁成功后也会返回stamp,作用与上述相同。tryReadLock同样如此。
      • 乐观读锁:使用tryOptimisticRead方法获取,只有在写锁可用时才能成功获取乐观读锁,获取成功后也会返回一个stamp。validate方法可以根据这个stamp来判断写锁是否被获取。这种模式可以理解为一个弱化的读锁(weak version of a read-lock),它在任何时候都能被破坏。乐观读模式常被用在短的只读的代码段,用来减少争用并提高吞吐量。乐观读区域应该只读取字段,并将它们保存在本地变量中,以便在验证(validate方法)后使用。在乐观读模式中字段的读取可能会不一致,所以可能需要反复调用validate()来检查一致性。例如,当首次读取一个对象或数组引用,然后访问其中一个的字段、元素或方法时,这些步骤通常是必需的。
    2. StampedLock还支持在三种模式中提供有条件地转换。例如,tryConvertToWriteLock方法尝试升级一个锁模式,下面三种情况下可以升级模式并返回一个有效的write stamp:
      (1) 已经在writing模式中
      (2) 在reading模式中并且已经没有其他读线程
      (3) 在乐观读模式中锁可用
      这些方法的表现形式旨在帮助减少由于基于重试(retry-based)设计造成的代码膨胀。

    3. StampedLock 被设计作为线程安全模型的内部工具类。它的使用依赖于对数据、对象和方法的内部属性有一定的了解。StampedLock 是不可重入的,所以在锁的内部不能调用其他尝试重复获取锁的方法。一个stamp如果在很长时间都没有使用或验证,在很长一段时间之后可能就会验证失败。StampedLocks是可序列化的,但是反序列化后变为初始的非锁定状态,所以在远程锁定中是不安全的

    4. StampedLock 的调度策略不会始终偏向读线程或写线程,所有的"try"方法都是尽最大努力获取,并不一定遵循任何调度或公平策略。从"try"方法获取或转换锁失败返回0时,不会携带任何锁的状态信息。由于StampedLock支持跨多个锁模式的协调使用,它不会直接实现LockReadWriteLock接口。但是,如果应用程序需要Lock的相关功能,它可以通过asReadLock()、asWriteLock()asReadWriteLock()方法返回一个Lock视图。

    在介绍了 StampedLock 的特性之后,我们来看一下内部的等待队列的实现:

    StampedLock等待队列

    相较于AQS,可以看到 StampedLock 的等待队列多了一个cowait节点链,这个节点用来存放等待读的线程列表。也就是说,等待写的线程存放在链表的正常节点中,如果有读线程等待获取锁,就会把这个读线程放到cowait节点链上。

    1.1 核心参数

    //获取锁失败入队之前的最大自旋次数(实际运行时并不一定是这个数)
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
    //头节点获取锁的最大自旋次数
    private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
    //头节点再次阻塞前的最大自旋次数
    private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
    //等待自旋锁溢出的周期数
    private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
    //在溢出之前读线程计数用到的bit数
    private static final int LG_READERS = 7;
    // Values for lock state and stamp operations
    private static final long RUNIT = 1L;//读锁单位
    private static final long WBIT  = 1L << LG_READERS;//写状态标识 1000 0000
    private static final long RBITS = WBIT - 1L;//读状态标识 111 1111
    private static final long RFULL = RBITS - 1L; //读锁最大资源数 111 1110
    private static final long ABITS = RBITS | WBIT; //用于获取锁状态 1111 1111
    private static final long SBITS = ~RBITS; //note overlap with ABITS
    //锁状态初始值
    private static final long ORIGIN = WBIT << 1;
    //中断标识
    private static final long INTERRUPTED = 1L;
    //节点状态 等待/取消
    private static final int WAITING   = -1;
    private static final int CANCELLED =  1;
    //节点模型 读/写
    private static final int RMODE = 0;
    private static final int WMODE = 1;
    

    状态判断:
    state & ABITS == 0L 写锁可用
    state & ABITS < RFULL 读锁可用
    state & ABITS == WBIT 写锁已经被其他线程获取
    state & ABITS == RFULL 读锁饱和,可尝试增加额外资源数
    (stamp & SBITS) == (state & SBITS) 验证stamp是否为当前已经获取的锁stamp
    (state & WBIT) != 0L 当前线程已经持有写锁
    (state & RBITS) != 0L 当前线程已经持有读锁
    s & RBITS 读锁已经被获取的数量

    1.2 函数列表

    //构造函数
    public StampedLock() {
        state = ORIGIN;
    }
    //获取写锁,等待锁可用
    public long writeLock()
    //获取写锁,直接返回
    public long tryWriteLock()
    //获取写锁,等待指定的时间
    public long tryWriteLock(long time, TimeUnit unit)
    //获取写锁,响应中断
    public long writeLockInterruptibly()
    //获取读锁,等待锁可用
    public long readLock()
    //尝试获取读锁,直接返回
    public long tryReadLock()
    //获取读锁,限制等待时间
    public long tryReadLock(long time, TimeUnit unit)
    //获取读锁,响应中断
    public long readLockInterruptibly()
    //获取乐观读锁,如果写锁可用获取成功,不修改任何状态值
    public long tryOptimisticRead()
    //验证stamp,如果在锁发出给定的stamp之后写锁没有被获取,或者给定stamp是当前已经获取的锁stamp,则返回true。一般用在乐观读锁中,用于判断是否可继续获取读锁。
    public boolean validate(long stamp)
    //释放写锁
    public void unlockWrite(long stamp)
    //释放读锁
    public void unlockRead(long stamp) 
    //释放给定stamp对应的锁
    public void unlock(long stamp)
    //尝试升级给定stamp对应的锁为写锁
    public long tryConvertToWriteLock(long stamp)
    //尝试降级给定stamp对应的锁为读锁
    public long tryConvertToReadLock(long stamp)
    //尝试降级给定stamp对应的锁为乐观读锁
    public long tryConvertToOptimisticRead(long stamp) 
    //尝试释放写锁,一般用在异常复原
    public boolean tryUnlockWrite()
    //尝试释放读锁,一般用在异常复原
    public boolean tryUnlockRead()
    //写锁是否被持有
    public boolean isWriteLocked()
    //读锁是否被持有
    public boolean isReadLocked()
    //获取读锁数
    public int getReadLockCount()
    //返回一个ReadLock
    public Lock asReadLock()
    //返回一个WriteLock
    public Lock asWriteLock()
    //返回一个ReadWriteLock
    public ReadWriteLock asReadWriteLock()
    

    2. 源码解析

    2.1 writeLock()

    //获取写锁,等待可用
    public long writeLock() {
        long s, next;  // bypass acquireWrite in fully unlocked case only
        return ((((s = state) & ABITS) == 0L &&
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                next : acquireWrite(false, 0L));
    }
    private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
        //第一个自旋,准备入队
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
            if ((m = (s = state) & ABITS) == 0L) {//锁可用
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))//获取锁 CAS修改锁状态
                    return ns;
            }
            else if (spins < 0)
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;//自旋次数
            else if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;    //随机递减
            }
            else if ((p = wtail) == null) { // initialize queue
                WNode hd = new WNode(WMODE, null);//初始化写锁等待队列
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(WMODE, p);//创建新的等待节点
            else if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//更新tail节点
                p.next = node;
                break;
            }
        }
    
        //第二个自旋,节点依次获取锁
        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {//当前节点是最后一个等待节点
                if (spins < 0)
                    spins = HEAD_SPINS; //头结点自旋次数
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1; // spins=spins/2
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    if (((s = state) & ABITS) == 0L) {//锁可用
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {//更新锁状态
                            //更新头结点,返回stamp
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0)//随机递减
                        break;
                }
            }
            else if (h != null) { // help release stale waiters
                WNode c; Thread w;
                //依次唤醒头节点的cowait节点线程
                while ((c = h.cowait) != null) {//有等待读的线程
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && //CAS更新头结点的cowait
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                //检查队列稳定性
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {//尾节点取消,更新尾节点的前继节点为p.prev,继续自旋
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);//超时,取消等待
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                        U.park(false, time);  // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);//中断,取消等待
                }
            }
        }
    }
    

    说明:获取写锁,如果锁可用((state & ABITS) == 0L)则直接获取写锁并返回stamp,否则调用acquireWrite等待锁可用,acquireWrite主要由两个自旋组成,代码虽然比较多,但是逻辑很简单,函数大概执行流程如下:

    1. 第一个自旋,使当前线程进入等待队列的尾节点。
    2. 第二个自旋,节点依次获取写锁,直到当前线程所在节点的前继节点(prev)为头结点时,如果锁可用,则说明可以获取锁,获取成功返回stamp
    3. 如果在自旋中未能成功获取到锁,并且线程被中断或者等待超时,则调用cancelWaiter方法取消节点的等待,cancelWaiter后面会分析。

    2.2 readLock()

    //获取写锁,等待锁可用
    public long readLock() {
        long s = state, next;  // bypass acquireRead on common uncontended case
        return ((whead == wtail && (s & ABITS) < RFULL && //还有可用资源
                 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                next : acquireRead(false, 0L));
    }
    private long acquireRead(boolean interruptible, long deadline) {
        WNode node = null, p;
        //第一个自旋,入队
        for (int spins = -1;;) {
            WNode h;
            if ((h = whead) == (p = wtail)) {//等待队列为空
                for (long m, s, ns;;) {
                    if ((m = (s = state) & ABITS) < RFULL ? //有可用资源
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                        return ns;
                    else if (m >= WBIT) {
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins;//随机递减自旋数
                        }
                        else {
                            if (spins == 0) { //自旋结束,准备进入等待队列
                                WNode nh = whead, np = wtail;
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break;
                            }
                            spins = SPINS;
                        }
                    }
                }
            }
            if (p == null) { // initialize queue
                //初始化等待队列
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(RMODE, p);//创建新的节点
            else if (h == p || p.mode != RMODE) {
                //到这里说明尾节点是写线程
                if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//更新tail节点为当前节点
                    p.next = node;
                    break;
                }
            }
            else if (!U.compareAndSwapObject(p, WCOWAIT,
                                             node.cowait = p.cowait, node))//到这里说明尾节点是等待读的节点,CAS把当前节点(node节点)转移到p节点的cowait上
                node.cowait = null;
            else {
                //当前节点进入等待队列成功后的逻辑(当前节点已被转移到尾节点的cowait上)
                for (;;) {
                    WNode pp, c; Thread w;
                    if ((h = whead) != null && (c = h.cowait) != null &&
                        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null) // help release
                        U.unpark(w);    //唤醒头节点等待读线程
                    if (h == (pp = p.prev) || h == p || pp == null) {//没有前继节点,可以尝试唤醒当前节点等待的线程
                        long m, s, ns;
                        do {
                            if ((m = (s = state) & ABITS) < RFULL ?
                                U.compareAndSwapLong(this, STATE, s,
                                                     ns = s + RUNIT) : //获取锁
                                (m < WBIT &&
                                 (ns = tryIncReaderOverflow(s)) != 0L))//读锁饱和,尝试增加额外的读锁数量,只有在读锁数=RFULL时才可以增加
                                return ns;  //返回stamp
                        } while (m < WBIT);
                    }
                    //超时及中断判断逻辑
                    if (whead == h && p.prev == pp) {//检查队列是否稳定
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                            node = null; // throw away
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, p, false);//超时,取消等待
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                            whead == h && p.prev == pp)
                            U.park(false, time);//阻塞等待
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);//被中断,取消等待
                    }
                }
            }
        }
    
        //第二个自旋,节点依次获取锁
        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {//当前节点是最后一个等待节点
                if (spins < 0)
                    spins = HEAD_SPINS;//初始化自旋数
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long m, s, ns;
                    if ((m = (s = state) & ABITS) < RFULL ? //有可用资源
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        //获取读锁成功,更新头节点为当前节点
                        WNode c; Thread w;
                        whead = node;
                        node.prev = null;
                        //依次唤醒当前节点的cowait节点线程
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT,
                                                       c, c.cowait) &&
                                (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    }
                    else if (m >= WBIT &&
                             LockSupport.nextSecondarySeed() >= 0 && --k <= 0)//随机递减自旋次数
                        break;
                }
            }
            else if (h != null) {
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    //依次唤醒head节点的cowait节点线程
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                //检查队列稳定性
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {//尾节点取消,更新尾节点的前继节点为p.prev,继续自旋
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    //超时及中断判断逻辑
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);//超时,取消等待
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 &&
                        (p != h || (state & ABITS) == WBIT) &&
                        whead == h && node.prev == p)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);//被中断,取消等待
                }
            }
        }
    }
    

    说明:获取读锁,代码比较多,但逻辑很简单,跟wirteLock()差不多。详细的流程这里就不细说了,有兴趣的同学可以参考笔者添加的注释一步一步阅读。
    如果有可用资源((state & ABITS) < RFULL)则直接获取读锁并返回stamp,否则调用acquireRead等待锁可用,acquireRead函数执行流程如下:

    1. 第一个自旋,使当前线程进入等待队列的尾节点。注意这里跟获取写锁时的区别,在获取写锁时,把当前线程所在的节点直接放入队尾;但是在获取读锁时,是把当前线程所在的节点放入尾节点的cowait节点里
    2. 第二个自旋,节点依次获取读锁。直到当前线程所在节点的前继节点(prev)为头结点时,如果有可用资源,则说明可以获取锁,获取成功返回stamp
    3. 如果在自旋中未能成功获取到锁,并且线程被中断或者等待超时,则调用cancelWaiter方法取消节点的等待。

    2.3 tryOptimisticRead()

    //获取乐观读锁 
    public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }
    

    说明:获取乐观读锁,取决于写锁的状态,如果写锁空闲则获取成功,并且不修改任何状态值。函数比较简单,不多赘述。

    2.4 cancelWaiter(WNode node, WNode group, boolean interrupted)

    //取消给定节点
    private long cancelWaiter(WNode node, WNode group, boolean interrupted) {
        if (node != null && group != null) {
            Thread w;
            node.status = CANCELLED;//修改节点状态
            // unsplice cancelled nodes from group
            //依次解除已经取消的cowait节点的链接
            for (WNode p = group, q; (q = p.cowait) != null;) {
                if (q.status == CANCELLED) {
                    U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
                    p = group; // restart
                }
                else
                    p = q;
            }
            if (group == node) {
                //依次唤醒节点上的未取消的cowait节点线程
                for (WNode r = group.cowait; r != null; r = r.cowait) {
                    if ((w = r.thread) != null)
                        U.unpark(w);       // wake up uncancelled co-waiters
                }
                //
                for (WNode pred = node.prev; pred != null; ) { // unsplice
                    WNode succ, pp;        // find valid successor
                    while ((succ = node.next) == null ||
                           succ.status == CANCELLED) { //后继节点为空或者已经取消,则去查找一个有效的后继节点
                        WNode q = null;    // find successor the slow way
                        //从尾节点开始往前查找距离node节点最近的一个有效节点q
                        for (WNode t = wtail; t != null && t != node; t = t.prev)
                            if (t.status != CANCELLED)
                                q = t;     // don't link if succ cancelled
                        if (succ == q ||   // ensure accurate successor
                                //运行到这里说明从node到“距离node最近的一个有效节点q”之间可能存在已经取消的节点
                                // CAS替换node的后继节点为“距离node最近的一个有效节点”,也就是说解除了“所有已经取消但是还存在在链表上的无效节点”的链接
                            U.compareAndSwapObject(node, WNEXT,
                                                   succ, succ = q)) {
                            if (succ == null && node == wtail) {
                                //运行到这里说明node为尾节点,
                                //利用CAS先修改尾节点为node的前继有效节点,后面再解除node的链接
                                U.compareAndSwapObject(this, WTAIL, node, pred);
                            }
                            break;
                        }
                    }
                    //解除node节点的链接
                    if (pred.next == node) // unsplice pred link
                        U.compareAndSwapObject(pred, WNEXT, node, succ);
                    //唤醒后继节点的线程
                    if (succ != null && (w = succ.thread) != null) {
                        succ.thread = null;
                        U.unpark(w);       // wake up succ to observe new pred
                    }
                    //如果前继节点已经取消,向前查找一个有效节点继续循环,如果这个节点为空则直接跳出循环
                    if (pred.status != CANCELLED || (pp = pred.prev) == null)
                        break;
                    node.prev = pp;        // repeat if new pred wrong/cancelled
                    U.compareAndSwapObject(pp, WNEXT, pred, succ);
                    pred = pp;
                }
            }
        }
        //检查是否可唤醒head节点的后继节点线程
        WNode h; // Possibly release first waiter
        while ((h = whead) != null) {
            long s; WNode q; // similar to release() but check eligibility
            if ((q = h.next) == null || q.status == CANCELLED) {
                //从尾节点向前查找一个未取消的节点,作为头节点的next节点
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    if (t.status <= 0)
                        q = t;
            }
            if (h == whead) {
                if (q != null && h.status == 0 &&
                    ((s = state) & ABITS) != WBIT && // waiter is eligible
                    (s == 0L || q.mode == RMODE))//锁可用,或者后继节点是读线程
                    release(h);//可以唤醒头节点的后继节点线程
                break;
            }
        }
        return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
    }
    

    说明:如果节点线程被中断或者等待超时,需要取消节点的链接。大概的操作就是首先修改节点为取消状态,然后解除它在等待队列中的链接,并且唤醒节点上所有等待读的线程(也就是cowait节点);最后如果锁可用,帮助唤醒头节点的后继节点的线程。其实也是AQS中取消获取锁方法的一种变体(详见AQS篇)。

    重点介绍一下cancelWaiter的前两个参数nodegroup

    1. 如果node!=group,说明node节点是group节点上的一个cowait节点(如果不明白请见上面代码中对acquireRead方法中的U.compareAndSwapObject(p, WCOWAIT,node.cowait = p.cowait, node)这一行代码的注释),这种情况下首先修改node节点的状态(node.status = CANCELLED),然后直接操作group节点,依次解除group节点上已经取消的cowait节点的链接。最后如果锁可用,帮助唤醒头节点的后继节点的线程。
    2. 如果node==group,说明在node节点之前的节点为写线程节点,这时需要进行以下操作:
      a) 依次唤醒node节点上的未取消的cowait节点线程
      b) 解除node节点和一段节点(node节点到“距离node最近的一个有效节点”)的链接
      c) 最后如果锁可用,帮助唤醒头节点的后继节点的线程。

    小结

    关于StampedLock的其他方法,由于实现方式与我们已经讲述的方法大同小异,而且也相对比较简单,这里就不在详细介绍了,有兴趣的同学可以自行下载源码参照本篇的源码解析进行查阅。

    相关文章

      网友评论

        本文标题:JUC源码分析-JUC锁(六):StampedLock

        本文链接:https://www.haomeiwen.com/subject/rzxjgxtx.html