美文网首页
多线程-StampLock

多线程-StampLock

作者: 麦大大吃不胖 | 来源:发表于2021-09-14 15:47 被阅读0次

    by shihang.mai

    0. 前言

    大神Doug Lea在类上注释已经有使用例子,这里贴一下

    class Point {
       private double x, y;
       private final StampedLock sl = new StampedLock();
    
       void move(double deltaX, double deltaY) { // an exclusively locked method
         long stamp = sl.writeLock();
         try {
           x += deltaX;
           y += deltaY;
         } finally {
           sl.unlockWrite(stamp);
         }
       }
    
       double distanceFromOrigin() { // A read-only method
         long stamp = sl.tryOptimisticRead();
         double currentX = x, currentY = y;
         if (!sl.validate(stamp)) {
            stamp = sl.readLock();
            try {
              currentX = x;
              currentY = y;
            } finally {
               sl.unlockRead(stamp);
            }
         }
         return Math.sqrt(currentX * currentX + currentY * currentY);
       }
    
       void moveIfAtOrigin(double newX, double newY) { // upgrade
         // Could instead start with optimistic, not read mode
         long stamp = sl.readLock();
         try {
           while (x == 0.0 && y == 0.0) {
             long ws = sl.tryConvertToWriteLock(stamp);
             if (ws != 0L) {
               stamp = ws;
               x = newX;
               y = newY;
               break;
             }
             else {
               sl.unlockRead(stamp);
               stamp = sl.writeLock();
             }
           }
         } finally {
           sl.unlock(stamp);
         }
       }
     }
    

    1. 数据结构

    • StampLock内部会维护一个CLH队列。

    每一个节点是一个WNode(Wait Node)

    static final class WNode {
            volatile WNode prev;
            volatile WNode next;
            volatile WNode cowait;    // list of linked readers
            volatile Thread thread;   // non-null while possibly parked
            volatile int status;      // 0, WAITING, or CANCELLED
            final int mode;           // RMODE or WMODE
    }
    

    当队列中有若干个线程等待,就会像下面的图一样

    StampLock队列举例.png
    • 锁状态位state
    private transient volatile long state;
    private static final long ORIGIN = WBIT << 1;
    private static final long WBIT  = 1L << LG_READERS;
    private static final int LG_READERS = 7;
    public StampedLock() {
            state = ORIGIN;
    }
    

    上面是long state的初始化的值,即1<<8( ...0001 0000 0000)

    public long writeLock() {
            long s, next;  // bypass acquireWrite in fully unlocked case only
            return ((((s = state) & ABITS) == 0L &&
                     U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                    next : acquireWrite(false, 0L));
        }
    

    上面是加写锁,将state += WBIT(1<<7),即 ...0001 1000 0000

    public void unlockWrite(long stamp) {
            WNode h;
            if (state != stamp || (stamp & WBIT) == 0L)
                throw new IllegalMonitorStateException();
            state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
            if ((h = whead) != null && h.status != 0)
                release(h);
        }
    

    上面是释放写锁,将state = (stamp += WBIT),即 ...0010 0000 0000
    这里相当于把释放锁的次数也记录了,记录它的目的是因为整个state 的状态判断都是基于CAS操作的。而普通的CAS操作可能会遇到ABA的问题,如果不记录次数,那么当写锁释放掉,申请到,再释放掉时,我们将无法判断数据是否被写过


    这里继续说一下前7位,它是用来记录读线程的数量的,所以它最大记录126个,超过的部分存放在readerOverflow

    private static final long RFULL = RBITS - 1L;
    private static final long RBITS = WBIT - 1L;
    private static final long WBIT  = 1L << LG_READERS;
    private static final int LG_READERS = 7;
    private transient int readerOverflow;
    

    总结一下,如图


    StampLock状态位.png

    2. 写锁的加锁和释放

    • 加锁
    public long writeLock() {
            long s, next;  // bypass acquireWrite in fully unlocked case only
            return ((((s = state) & ABITS) == 0L &&
                     U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                    next : acquireWrite(false, 0L));
    }
    

    如果CAS设置state失败,表示写锁申请失败,这时,会调用acquireWrite(),而对于这个方法,比较复杂,就简单总结一下就是


    StampLock获取写锁主要步骤.png
    • 释放锁
    1. 恢复state中写锁标志位为0
    2. 同时增加释放锁次数
    3. 唤醒后续线程
    public void unlockWrite(long stamp) {
            WNode h;
            if (state != stamp || (stamp & WBIT) == 0L)
                throw new IllegalMonitorStateException();
            state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
            if ((h = whead) != null && h.status != 0)
                release(h);
        }
    

    3. 读锁的加锁和释放

    • 加锁
    public long readLock() {
            long s = state, next;  // bypass acquireRead on common uncontended case
            return ((whead == wtail && (s & ABITS) < RFULL &&
                     U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                    next : acquireRead(false, 0L));
        }
    

    队列中没有写锁&&读线程个数没有超过126,直接获得锁,并且读线程数量加1。这里也有一个acquireRead(),简单总结一下


    StampLock获取读锁主要流程.png
    • 释放锁
    1. state读线程数-1
    2. 唤醒下一个线程
    public void unlockRead(long stamp) {
            long s, m; WNode h;
            for (;;) {
                if (((s = state) & SBITS) != (stamp & SBITS) ||
                    (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                    throw new IllegalMonitorStateException();
                if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                        if (m == RUNIT && (h = whead) != null && h.status != 0)
                            release(h);
                        break;
                    }
                }
                else if (tryDecReaderOverflow(s) != 0L)
                    break;
            }
        }
    

    4. 使用不当会打满CPU

    public class StampedLockTest {
        public static void main(String[] args) throws InterruptedException {
            final StampedLock lock = new StampedLock();
            Thread t1 = new Thread(() -> {
                // 获取写锁
                lock.writeLock();
                // 模拟程序阻塞等待其他资源
                LockSupport.park();
            });
            t1.start();
            // 保证t1获取写锁
            Thread.sleep(100);
            Thread t2 = new Thread(() -> {
                // 阻塞在悲观读锁
                lock.readLock();
            });
            t2.start();
            // 保证t2阻塞在读锁
            Thread.sleep(100);
            // 中断线程t2,会导致线程t2所在CPU飙升
            t2.interrupt();
            t2.join();
        }
    }
    
    image.png

    原因如下:


    StampLock获取读锁主要流程.png
    1. 如果没有中断,那么阻塞在readLock()上的线程在经过几次自旋后,会进入park()等待,一旦进入park()等待,就不会占用CPU了。
    2. 但是park()这个函数有一个特点,就是一旦线程被中断,park()就会立即返回,也不抛异常。
    3. 而线程中断标记一直打开着,不停的自选,所以CPU就爆满了。

    解决
    在StampedLock内部,在park()返回时,需要判断中断标记位,并作出正确的处理,比如,退出,抛异常,或者把中断位给清理一下

    相关文章

      网友评论

          本文标题:多线程-StampLock

          本文链接:https://www.haomeiwen.com/subject/dlscgltx.html