美文网首页
StampedLock

StampedLock

作者: 王侦 | 来源:发表于2019-07-25 16:21 被阅读0次

    1.官方文档

    A capability-based lock with three modes for controlling 
    read/write access. The state of a StampedLock consists of a 
    version and mode. Lock acquisition methods return a stamp 
    that represents and controls access with respect to a lock state; 
    "try" versions of these methods may instead return the special 
    value zero to represent failure to acquire access. Lock release 
    and conversion methods require stamps as arguments, and fail 
    if they do not match the state of the lock. The three modes are:
    
    * Writing. Method writeLock() possibly blocks waiting for 
      exclusive access, returning a stamp that can be used in method 
      unlockWrite(long) to release the lock. Untimed and timed 
      versions of tryWriteLock are also provided. When the lock is 
      held in write mode, no read locks may be obtained, and all 
      optimistic read validations will fail.
    
    * Reading. Method readLock() possibly blocks waiting for non-
      exclusive access, returning a stamp that can be used in method 
      unlockRead(long) to release the lock. Untimed and timed 
      versions of tryReadLock are also provided.
    
    * Optimistic Reading. Method tryOptimisticRead() returns a 
      non-zero stamp only if the lock is not currently held in write 
      mode. Method validate(long) returns true if the lock has not 
      been acquired in write mode since obtaining a given stamp. 
      This mode can be thought of as an extremely weak version of a 
      read-lock, that can be broken by a writer at any time. The use 
      of optimistic mode for short read-only code segments often 
      reduces contention and improves throughput. However, its use 
      is inherently fragile. Optimistic read sections should only read 
      fields and hold them in local variables for later use after 
      validation. Fields read while in optimistic mode may be wildly 
      inconsistent, so usage applies only when you are familiar 
      enough with data representations to check consistency and/or 
      repeatedly invoke method validate(). For example, such steps 
      are typically required when first reading an object or array 
      reference, and then accessing one of its fields, elements or 
      methods.
    
    This class also supports methods that conditionally provide 
    conversions across the three modes. For example, method 
    tryConvertToWriteLock(long) attempts to "upgrade" a mode, 
    returning a valid write stamp if (1) already in writing mode (2) in 
    reading mode and there are no other readers or (3) in optimistic 
    mode and the lock is available. The forms of these methods are 
    designed to help reduce some of the code bloat that otherwise 
    occurs in retry-based designs.
    
    StampedLocks are designed for use as internal utilities in the 
    development of thread-safe components. Their use relies on 
    knowledge of the internal properties of the data, objects, and 
    methods they are protecting. They are not reentrant, so locked 
    bodies should not call other unknown methods that may try to 
    re-acquire locks (although you may pass a stamp to other 
    methods that can use or convert it). The use of read lock 
    modes relies on the associated code sections being side-effect-
    free. Unvalidated optimistic read sections cannot call methods 
    that are not known to tolerate potential inconsistencies. Stamps 
    use finite representations, and are not cryptographically secure 
    (i.e., a valid stamp may be guessable). Stamp values may 
    recycle after (no sooner than) one year of continuous operation. 
    A stamp held without use or validation for longer than this 
    period may fail to validate correctly. StampedLocks are 
    serializable, but always deserialize into initial unlocked state, so 
    they are not useful for remote locking.
    
    The scheduling policy of StampedLock does not consistently 
    prefer readers over writers or vice versa. All "try" methods are 
    best-effort and do not necessarily conform to any scheduling or 
    fairness policy. A zero return from any "try" method for 
    acquiring or converting locks does not carry any information 
    about the state of the lock; a subsequent invocation may 
    succeed.
    
    Because it supports coordinated usage across multiple lock 
    modes, this class does not directly implement the Lock or 
    ReadWriteLock interfaces. However, a StampedLock may be 
    viewed asReadLock(), asWriteLock(), or asReadWriteLock() in 
    applications requiring only the associated set of functionality.
    
    Sample Usage. The following illustrates some usage idioms in 
    a class that maintains simple two-dimensional points. The 
    sample code illustrates some try/catch conventions even 
    though they are not strictly needed here because no exceptions 
    can occur in their bodies.
    

    基于功能的锁,具有三种控制读/写访问的模式。 StampedLock的状态包括版本和模式。锁获取方法返回一个表示和控制锁状态访问的stamp;这些方法的“try”版本可能会返回特殊值零以表示获取访问失败。锁释放和转换方法需要stamp作为参数,如果它们与锁状态不匹配则会失败。这三种模式是:

    • 写。方法writeLock()可能阻塞等待独占访问,返回可在方法unlockWrite(long)中使用的stamp以释放锁。还提供了不定时和定时版本的tryWriteLock。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。
    • 读。方法readLock()可能阻塞等待非独占访问,返回可以在方法unlockRead(long)中使用的stamp以释放锁。还提供了不定时和定时版本的tryReadLock。
    • 乐观读。方法tryOptimisticRead()仅在当前锁未处于写模式时才返回非零stamp。方法validate(long)如果在获取给定stamp后没有被写模式获取锁,则返回true。这种模式可以被认为是读锁的极弱版本,可以随时由写线程打破。对短的只读代码段使用乐观模式通常可以减少争用并提高吞吐量。但是,它的使用本质上是脆弱的。乐观读取部分应该只读取字段并在验证后将它们保存在局部变量中,以便以后使用。在乐观模式下读取的字段可能非常不一致,因此仅当您熟悉数据表示以检查一致性和/或重复调用方法validate()时才能使用。例如,在首先读取对象或数组引用,然后访问其字段、元素或方法之一时,通常需要这些步骤。

    此类还支持提供有条件地跨三种模式的转换的方法。例如,方法tryConvertToWriteLock(long)尝试“升级”一个模式并返回一个有效的写stamp,如果

    • 1)已经处于写模式
    • 2)处于读模式并且没有其他读线程
    • 3)处于乐观模式并且锁是可用的。

    这些方法的形式旨在帮助减少基于重试设计中出现的一些代码膨胀。

    StampedLocks设计用作开发线程安全组件的内部实用程序。它们的使用依赖于对它们所保护的数据、对象和方法的内部属性的了解。它们不是可重入的,因此锁定的主体不应该调用可能尝试重新获取锁的其他未知方法(尽管您可以将stamp传递给可以使用或转换它的其他方法)。读模式的使用依赖于相关的代码部分是无副作用的。未经验证的乐观读取部分无法调用未知容忍潜在不一致的方法。stamps使用有限表示,并且不是加密安全的(即可猜测有效stamp)。stamp值可在连续操作一年后(不快于一年)后重循环。未经使用或验证超过此期限而持有的stamp可能无法正确验证。 StampedLocks是可序列化的,但总是反序列化为初始无锁状态,因此它们对远程锁定没有用。

    StampedLock的调度策略并不总是倾向于读线程,反之亦然。所有try方法都是尽力而为,并不一定符合任何调度或公平政策。从任何“try”方法获取或转换锁时返回零 不会携带有关锁状态的任何信息;后续调用可能会成功。

    因为它支持跨多种锁模式的协同使用,所以此类不直接实现Lock或ReadWriteLock接口。但是,StampedLock可以在仅需要相关功能集的应用程序中被当做asRe​​adLock(),asWriteLock()或asReadWriteLock()。

    示例用法。以下说明了维护简单二维点的类中的一些用法习惯。示例代码说明了一些try / catch约定,尽管这里没有严格要求,因为它们的主体中不会发生异常。

     class Point {
       private double x, y;
       private final StampedLock sl = new StampedLock();
    
       void move(double deltaX, double deltaY) { // an exclusively locked method
         long stamp = sl.writeLock();
         try {
           x += deltaX;
           y += deltaY;
         } finally {
           sl.unlockWrite(stamp);
         }
       }
    
       double distanceFromOrigin() { // A read-only method
         long stamp = sl.tryOptimisticRead();
         double currentX = x, currentY = y;
         if (!sl.validate(stamp)) {
            stamp = sl.readLock();
            try {
              currentX = x;
              currentY = y;
            } finally {
               sl.unlockRead(stamp);
            }
         }
         return Math.sqrt(currentX * currentX + currentY * currentY);
       }
    
       void moveIfAtOrigin(double newX, double newY) { // upgrade
         // Could instead start with optimistic, not read mode
         long stamp = sl.readLock();
         try {
           while (x == 0.0 && y == 0.0) {
             long ws = sl.tryConvertToWriteLock(stamp);
             if (ws != 0L) {
               stamp = ws;
               x = newX;
               y = newY;
               break;
             }
             else {
               sl.unlockRead(stamp);
               stamp = sl.writeLock();
             }
           }
         } finally {
           sl.unlock(stamp);
         }
       }
     }
    

    2.实现说明

    The design employs elements of Sequence locks (as used in 
    linux kernels; see Lameter's 
    http://www.lameter.com/gelato2005.pdf and elsewhere; see 
    Boehm's http://www.hpl.hp.com/techreports/2012/HPL-2012-
    68.html) and Ordered RW locks (see Shirako et al 
    http://dl.acm.org/citation.cfm?id=2312015) 
    
     Conceptually, the primary state of the lock includes a 
    sequence number that is odd when write-locked and even 
    otherwise. However, this is offset by a reader count that is non-
    zero when read-locked.  The read count is ignored when 
    validating "optimistic" seqlock-reader-style stamps.  Because 
    we must use a small finite number of bits (currently 7) for 
    readers, a supplementary reader overflow word is used when 
    the number of readers exceeds the count field. We do this by 
    treating the max reader count value (RBITS) as a spinlock 
    protecting overflow updates. 
    
     Waiters use a modified form of CLH lock used in 
    AbstractQueuedSynchronizer (see its internal documentation 
    for a fuller account), where each node is tagged (field mode) as 
    either a reader or writer. Sets of waiting readers are grouped 
    (linked) under a common node (field cowait) so act as a single 
    node with respect to most CLH mechanics.  By virtue of the 
    queue structure, wait nodes need not actually carry sequence 
    numbers; we know each is greater than its predecessor.  This 
    simplifies the scheduling policy to a mainly-FIFO scheme that 
    incorporates elements of Phase-Fair locks (see Brandenburg & 
    Anderson, especially http://www.cs.unc.edu/~bbb/diss/).  In 
    particular, we use the phase-fair anti-barging rule: If an 
    incoming reader arrives while read lock is held but there is a 
    queued writer, this incoming reader is queued.  (This rule is 
    responsible for some of the complexity of method acquireRead, 
    but without it, the lock becomes highly unfair.) Method release 
    does not (and sometimes cannot) itself wake up cowaiters. This 
    is done by the primary thread, but helped by any other threads 
    with nothing better to do in methods acquireRead and 
    acquireWrite. 
    
     These rules apply to threads actually queued. All tryLock forms 
    opportunistically try to acquire locks regardless of preference 
    rules, and so may "barge" their way in.  Randomized spinning 
    is used in the acquire methods to reduce (increasingly 
    expensive) context switching while also avoiding sustained 
    memory thrashing among many threads.  We limit spins to the 
    head of queue. A thread spin-waits up to SPINS times (where 
    each iteration decreases spin count with 50% probability) 
    before blocking. If, upon wakening it fails to obtain lock, and is 
    still (or becomes) the first waiting thread (which indicates that 
    some other thread barged and obtained lock), it escalates spins 
    (up to MAX_HEAD_SPINS) to reduce the likelihood of 
    continually losing to barging threads. 
    
     Nearly all of these mechanics are carried out in methods 
    acquireWrite and acquireRead, that, as typical of such code, 
    sprawl out because actions and retries rely on consistent sets 
    of locally cached reads. 
    
     As noted in Boehm's paper (above), sequence validation 
    (mainly method validate()) requires stricter ordering rules than 
    apply to normal volatile reads (of "state").  To force orderings of 
    reads before a validation and the validation itself in those cases 
    where this is not already forced, we use Unsafe.loadFence. 
    
     The memory layout keeps lock state and queue pointers 
    together (normally on the same cache line). This usually works 
    well for read-mostly loads. In most other cases, the natural 
    tendency of adaptive-spin CLH locks to reduce memory 
    contention lessens motivation to further spread out contended 
    locations, but might be subject to future improvements.
    

    该设计采用了序列锁Sequence locks的元素(在linux内核中使用;参见Lameter的http://www.lameter.com/gelato2005.pdf和其他地方;参见Boehm的http://www.hpl.hp.com/techreports/2012/ HPL-2012-68.html)和有序RW锁Ordered RW locks(参见Shirako等人http://dl.acm.org/citation.cfm?id=2312015

    从概念上讲,锁的主要状态包括在写锁定时甚至是其他情况下奇数的序列号。但是,读锁定时被非0的读计数所抵消。验证“乐观”seqlock-reader-style stamps时,将忽略读计数。因为我们必须为读线程使用小的有限数量的位(当前为7位),所以当读线程的数量超过count时,将使用补充的读线程溢出字。我们通过将最大读线程count值(RBITS)视为保护溢出更新的自旋锁来实现此目的。

    等待线程使用在AbstractQueuedSynchronizer中使用的CLH锁的修改形式,其中每个节点都标记为(字段模式)作为读线程或写线程。等待读线程的集合组队(链接)放在一个公共节点(字段cowait)下,因此就大多数CLH机制而言充当单个节点。凭借队列结构,等待节点实际上不需要携带序列号;我们知道每个都比它的前驱更大。这将调度策略简化为FIFO方案,该方案包含Phase-Fair锁的元素(参见Brandenburg&Anderson,尤其是http://www.cs.unc.edu/~bbb/diss/)。特别是,我们使用phase-fair不允许插队规则:如果在保持读取锁定时传入的读线程但是有一个排队的写线程,则此传入的读线程将排队。 (这个规则导致了一些方法acquireRead的复杂性,但没有它,锁将变得非常不公平。)方法release本身不会(有时不能)唤醒cowaiters。这是由主线程完成的,其他在方法acquireRead和acquireWrite中没有更好的事情可以做的任何线程可以进行协助。

    这些规则适用于实际排队的线程。无论偏好规则如何,所有tryLock形式都会机会性地尝试获取锁定,因此可能会插队。在acquire方法中使用随机自旋来减少(越来越昂贵)上下文切换,同时还避免多线程间持续的内存抖动。我们将自旋限制在队列的头部。在阻塞之前,线程自旋等待SPINS次数(其中每次迭代以50%的概率减少自旋计数)。如果在唤醒后无法获得锁,并仍然(或成为)第一个等待线程(表示某个其他线程插队并获得锁),则会提升自旋次数(最多为MAX_HEAD_SPINS)以减少因为插队线程不断丢失的获取锁的可能。

    几乎所有这些机制都是在acquireWrite和acquireRead方法中执行的,这些代码通常会扩散,因为操作和重试依赖于一致的本地缓存读取集。

    正如Boehm的论文(上文)所述,序列验证(主要是方法validate())需要更严格的排序规则,而不是适用于普通的volatile读取(“state”)。要在验证之前强制读取顺序,并在尚未强制执行验证的情况下强制验证,我们使用Unsafe.loadFence。

    内存布局将锁定状态和队列指针保持在一起(通常在同一缓存行上)。这通常适用于大多数读取加载。在大多数其他情况下,自适应自旋CLH锁定以减少内存争用的自然趋势减少了进一步扩展竞争位置的动力,但可能受到未来改进的影响。

    3.源码分析

    3.1 构造器

        /**
         * Creates a new lock, initially in unlocked state.
         */
        public StampedLock() {
            state = ORIGIN;
        }
    
        /** Head of CLH queue */
        private transient volatile WNode whead;
        /** Tail (last) of CLH queue */
        private transient volatile WNode wtail;
    
        // views
        transient ReadLockView readLockView;
        transient WriteLockView writeLockView;
        transient ReadWriteLockView readWriteLockView;
    
        /** Lock sequence/state */
        private transient volatile long state;
        /** extra reader count when state read count saturated */
        private transient int readerOverflow;
    
        // Values for lock state and stamp operations
        private static final long RUNIT = 1L;
        private static final long WBIT  = 1L << LG_READERS;
        private static final long RBITS = WBIT - 1L;
        private static final long RFULL = RBITS - 1L;
        private static final long ABITS = RBITS | WBIT;
        private static final long SBITS = ~RBITS; // note overlap with ABITS
    
        // Initial value for lock state; avoid failure value zero
        private static final long ORIGIN = WBIT << 1;
    

    3.2 写锁

        /**
         * Exclusively acquires the lock, blocking if necessary
         * until available.
         *
         * @return a stamp that can be used to unlock or convert mode
         */
        public long writeLock() {
            long s, next;  // bypass acquireWrite in fully unlocked case only
            return ((((s = state) & ABITS) == 0L &&
                     U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                    next : acquireWrite(false, 0L));
        }
    

    当完全没有加锁时,绕过acquireWrite。

        private long acquireWrite(boolean interruptible, long deadline) {
            WNode node = null, p;
            for (int spins = -1;;) { // spin while enqueuing
                long m, s, ns;
                if ((m = (s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                        return ns;
                }
                else if (spins < 0)
                    spins = (m == WBIT && wtail == whead) ? SPINS : 0;
                else if (spins > 0) {
                    if (LockSupport.nextSecondarySeed() >= 0)
                        --spins;
                }
                else if ((p = wtail) == null) { // initialize queue
                    WNode hd = new WNode(WMODE, null);
                    if (U.compareAndSwapObject(this, WHEAD, null, hd))
                        wtail = hd;
                }
                else if (node == null)
                    node = new WNode(WMODE, p);
                else if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    break;
                }
            }
    
            for (int spins = -1;;) {
                WNode h, np, pp; int ps;
                if ((h = whead) == p) {
                    if (spins < 0)
                        spins = HEAD_SPINS;
                    else if (spins < MAX_HEAD_SPINS)
                        spins <<= 1;
                    for (int k = spins;;) { // spin at head
                        long s, ns;
                        if (((s = state) & ABITS) == 0L) {
                            if (U.compareAndSwapLong(this, STATE, s,
                                                     ns = s + WBIT)) {
                                whead = node;
                                node.prev = null;
                                return ns;
                            }
                        }
                        else if (LockSupport.nextSecondarySeed() >= 0 &&
                                 --k <= 0)
                            break;
                    }
                }
                else if (h != null) { // help release stale waiters
                    WNode c; Thread w;
                    while ((c = h.cowait) != null) {
                        if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                }
                if (whead == h) {
                    if ((np = node.prev) != p) {
                        if (np != null)
                            (p = np).next = node;   // stale
                    }
                    else if ((ps = p.status) == 0)
                        U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                    else if (ps == CANCELLED) {
                        if ((pp = p.prev) != null) {
                            node.prev = pp;
                            pp.next = node;
                        }
                    }
                    else {
                        long time; // 0 argument to park means no timeout
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, node, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                            whead == h && node.prev == p)
                            U.park(false, time);  // emulate LockSupport.park
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, node, true);
                    }
                }
            }
        }
    

    在该方法中复合了很多特性:

    • 1)首先是入队自旋,并放到队列尾部
    • 2)如果队列中只剩下一个结点,则在队头进一步自旋
    • 3)会帮助release队头的cowait链表
    • 4)最后会进入阻塞
        /**
         * If the lock state matches the given stamp, releases the
         * exclusive lock.
         *
         * @param stamp a stamp returned by a write-lock operation
         * @throws IllegalMonitorStateException if the stamp does
         * not match the current state of this lock
         */
        public void unlockWrite(long stamp) {
            WNode h;
            if (state != stamp || (stamp & WBIT) == 0L)
                throw new IllegalMonitorStateException();
            state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
            if ((h = whead) != null && h.status != 0)
                release(h);
        }
    
        /**
         * Wakes up the successor of h (normally whead). This is normally
         * just h.next, but may require traversal from wtail if next
         * pointers are lagging. This may fail to wake up an acquiring
         * thread when one or more have been cancelled, but the cancel
         * methods themselves provide extra safeguards to ensure liveness.
         */
        private void release(WNode h) {
            if (h != null) {
                WNode q; Thread w;
                U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
                if ((q = h.next) == null || q.status == CANCELLED) {
                    for (WNode t = wtail; t != null && t != h; t = t.prev)
                        if (t.status <= 0)
                            q = t;
                }
                if (q != null && (w = q.thread) != null)
                    U.unpark(w);
            }
        }
    

    唤醒队头的后继。

    3.3 读锁

        /**
         * Non-exclusively acquires the lock, blocking if necessary
         * until available.
         *
         * @return a stamp that can be used to unlock or convert mode
         */
        public long readLock() {
            long s = state, next;  // bypass acquireRead on common uncontended case
            return ((whead == wtail && (s & ABITS) < RFULL &&
                     U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                    next : acquireRead(false, 0L));
        }
    
    

    在无竞争的情况下直接跳过acquireRead。

        private long acquireRead(boolean interruptible, long deadline) {
            WNode node = null, p;
            for (int spins = -1;;) {
                WNode h;
                if ((h = whead) == (p = wtail)) {
                    for (long m, s, ns;;) {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                        else if (m >= WBIT) {
                            if (spins > 0) {
                                if (LockSupport.nextSecondarySeed() >= 0)
                                    --spins;
                            }
                            else {
                                if (spins == 0) {
                                    WNode nh = whead, np = wtail;
                                    if ((nh == h && np == p) || (h = nh) != (p = np))
                                        break;
                                }
                                spins = SPINS;
                            }
                        }
                    }
                }
                if (p == null) { // initialize queue
                    WNode hd = new WNode(WMODE, null);
                    if (U.compareAndSwapObject(this, WHEAD, null, hd))
                        wtail = hd;
                }
                else if (node == null)
                    node = new WNode(RMODE, p);
                else if (h == p || p.mode != RMODE) {
                    if (node.prev != p)
                        node.prev = p;
                    else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                        p.next = node;
                        break;
                    }
                }
                else if (!U.compareAndSwapObject(p, WCOWAIT,
                                                 node.cowait = p.cowait, node))
                    node.cowait = null;
                else {
                    for (;;) {
                        WNode pp, c; Thread w;
                        if ((h = whead) != null && (c = h.cowait) != null &&
                            U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                            (w = c.thread) != null) // help release
                            U.unpark(w);
                        if (h == (pp = p.prev) || h == p || pp == null) {
                            long m, s, ns;
                            do {
                                if ((m = (s = state) & ABITS) < RFULL ?
                                    U.compareAndSwapLong(this, STATE, s,
                                                         ns = s + RUNIT) :
                                    (m < WBIT &&
                                     (ns = tryIncReaderOverflow(s)) != 0L))
                                    return ns;
                            } while (m < WBIT);
                        }
                        if (whead == h && p.prev == pp) {
                            long time;
                            if (pp == null || h == p || p.status > 0) {
                                node = null; // throw away
                                break;
                            }
                            if (deadline == 0L)
                                time = 0L;
                            else if ((time = deadline - System.nanoTime()) <= 0L)
                                return cancelWaiter(node, p, false);
                            Thread wt = Thread.currentThread();
                            U.putObject(wt, PARKBLOCKER, this);
                            node.thread = wt;
                            if ((h != pp || (state & ABITS) == WBIT) &&
                                whead == h && p.prev == pp)
                                U.park(false, time);
                            node.thread = null;
                            U.putObject(wt, PARKBLOCKER, null);
                            if (interruptible && Thread.interrupted())
                                return cancelWaiter(node, p, true);
                        }
                    }
                }
            }
    
            for (int spins = -1;;) {
                WNode h, np, pp; int ps;
                if ((h = whead) == p) {
                    if (spins < 0)
                        spins = HEAD_SPINS;
                    else if (spins < MAX_HEAD_SPINS)
                        spins <<= 1;
                    for (int k = spins;;) { // spin at head
                        long m, s, ns;
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                            WNode c; Thread w;
                            whead = node;
                            node.prev = null;
                            while ((c = node.cowait) != null) {
                                if (U.compareAndSwapObject(node, WCOWAIT,
                                                           c, c.cowait) &&
                                    (w = c.thread) != null)
                                    U.unpark(w);
                            }
                            return ns;
                        }
                        else if (m >= WBIT &&
                                 LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                            break;
                    }
                }
                else if (h != null) {
                    WNode c; Thread w;
                    while ((c = h.cowait) != null) {
                        if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                }
                if (whead == h) {
                    if ((np = node.prev) != p) {
                        if (np != null)
                            (p = np).next = node;   // stale
                    }
                    else if ((ps = p.status) == 0)
                        U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                    else if (ps == CANCELLED) {
                        if ((pp = p.prev) != null) {
                            node.prev = pp;
                            pp.next = node;
                        }
                    }
                    else {
                        long time;
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, node, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if (p.status < 0 &&
                            (p != h || (state & ABITS) == WBIT) &&
                            whead == h && node.prev == p)
                            U.park(false, time);
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, node, true);
                    }
                }
            }
        }
    

    同理,acquireRead复合了很多机制:

    • 1)首先是入队自旋,如果队尾不是读模式则放到队列尾部,如果是读模式,则放到队尾的cowait中。
    • 2)如果队列中只剩下一个结点,则在队头进一步自旋
    • 3)会帮助release队头的cowait链表
    • 4)最后会进入阻塞
        /**
         * If the lock state matches the given stamp, releases the
         * non-exclusive lock.
         *
         * @param stamp a stamp returned by a read-lock operation
         * @throws IllegalMonitorStateException if the stamp does
         * not match the current state of this lock
         */
        public void unlockRead(long stamp) {
            long s, m; WNode h;
            for (;;) {
                if (((s = state) & SBITS) != (stamp & SBITS) ||
                    (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                    throw new IllegalMonitorStateException();
                if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                        if (m == RUNIT && (h = whead) != null && h.status != 0)
                            release(h);
                        break;
                    }
                }
                else if (tryDecReaderOverflow(s) != 0L)
                    break;
            }
        }
    

    3.4 乐观读

        /**
         * Returns a stamp that can later be validated, or zero
         * if exclusively locked.
         *
         * @return a stamp, or zero if exclusively locked
         */
        public long tryOptimisticRead() {
            long s;
            return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
        }
    

    如果是被写锁独占,则返回0。

        /**
         * Returns true if the lock has not been exclusively acquired
         * since issuance of the given stamp. Always returns false if the
         * stamp is zero. Always returns true if the stamp represents a
         * currently held lock. Invoking this method with a value not
         * obtained from {@link #tryOptimisticRead} or a locking method
         * for this lock has no defined effect or result.
         *
         * @param stamp a stamp
         * @return {@code true} if the lock has not been exclusively acquired
         * since issuance of the given stamp; else false
         */
        public boolean validate(long stamp) {
            U.loadFence();
            return (stamp & SBITS) == (state & SBITS);
        }
    
    

    SBITS只关心state有没有被写锁独占获取,如果没有,则返回true。(SBITS低位全为0,只获取高位值。低位为读锁标记,高位为写锁标记。)

    3.5 锁转换

        /**
         * If the lock state matches the given stamp, performs one of
         * the following actions. If the stamp represents holding a write
         * lock, returns it.  Or, if a read lock, if the write lock is
         * available, releases the read lock and returns a write stamp.
         * Or, if an optimistic read, returns a write stamp only if
         * immediately available. This method returns zero in all other
         * cases.
         *
         * @param stamp a stamp
         * @return a valid write stamp, or zero on failure
         */
        public long tryConvertToWriteLock(long stamp) {
            long a = stamp & ABITS, m, s, next;
            while (((s = state) & SBITS) == (stamp & SBITS)) {
                if ((m = s & ABITS) == 0L) {
                    if (a != 0L)
                        break;
                    if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                        return next;
                }
                else if (m == WBIT) {
                    if (a != m)
                        break;
                    return stamp;
                }
                else if (m == RUNIT && a != 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                             next = s - RUNIT + WBIT))
                        return next;
                }
                else
                    break;
            }
            return 0L;
        }
    

    方法tryConvertToWriteLock(long)尝试“升级”模式并返回一个有效的写stamp,如果

    • 1)已经处于写模式
    • 2)处于读模式并且没有其他读线程,则释放读锁
    • 3)处于乐观模式并且锁是可用的

    其他情况返回0。

    tryConvertToReadLock、tryConvertToOptimisticRead类似。

       /**
         * If the lock state matches the given stamp, releases the
         * corresponding mode of the lock.
         *
         * @param stamp a stamp returned by a lock operation
         * @throws IllegalMonitorStateException if the stamp does
         * not match the current state of this lock
         */
        public void unlock(long stamp) {
            long a = stamp & ABITS, m, s; WNode h;
            while (((s = state) & SBITS) == (stamp & SBITS)) {
                if ((m = s & ABITS) == 0L)
                    break;
                else if (m == WBIT) {
                    if (a != m)
                        break;
                    state = (s += WBIT) == 0L ? ORIGIN : s;
                    if ((h = whead) != null && h.status != 0)
                        release(h);
                    return;
                }
                else if (a == 0L || a >= WBIT)
                    break;
                else if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                        if (m == RUNIT && (h = whead) != null && h.status != 0)
                            release(h);
                        return;
                    }
                }
                else if (tryDecReaderOverflow(s) != 0L)
                    return;
            }
            throw new IllegalMonitorStateException();
        }
    

    4.总结

    StampedLock的等待队列与RRW的CLH队列相比,有以下特点:

    • 当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的cowait链中,cowait链本质是一个栈;
    • 当入队一个线程时,如果队尾是写结点,则直接链接到队尾;
    • QS类似唤醒线程的规则和A,都是首先唤醒队首结点。区别是StampedLock中,当唤醒的结点是读结点时,会唤醒该读结点的cowait链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。
    • 另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。


    参考

    相关文章

      网友评论

          本文标题:StampedLock

          本文链接:https://www.haomeiwen.com/subject/frzcrctx.html