美文网首页
AQS源码分析

AQS源码分析

作者: Corey1874 | 来源:发表于2022-04-04 20:52 被阅读0次

    一、前言

    问题引入

    @Controller
    public class TradeController {
        @Autowired
        private TradeService tradeService;
    
        @RequestMapping("/order")
        public String order() {
            tradeService.decStock();
            return "success";
        }
    }
    
    @Service
    public class TradeService {
        Logger logger = Logger.getLogger(TradeService.class);
        @Autowired
        JdbcTemplate jdbcTemplate;
        /**
         * 扣减库存
         * @return
         */
        public String decStock() {
            Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
            if (stock <= 0) {
                logger.info("库存不足,下单失败!");
                return "库存不足,下单失败!";
            }
            stock--;
            jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
            logger.info("下单成功,当前剩余库存:" + stock);
            return "下单成功,当前剩余库存:" + stock;
        }
    }
    

    如上代码,用Jmeter模拟30个请求同时下单,结果30个请求都下单成功,产生了超卖问题。

    自定义AQS解决

    下面实现自定义一个同步器来实现自定义锁

    /**
     * 2021/7/1
     * 自定义AQS实现
     */
    public class MyLock {
    
        private volatile int state = 0;
    
        private Thread lockHolder;
    
        // 要用线程安全的队列作为等待队列,基于CAS实现
        // linkedBlockedQueue基于AQS实现,不能用
        private ConcurrentLinkedDeque<Thread> waiters = new ConcurrentLinkedDeque<>();
    
        public int getState() {
            return state;
        }
        public void setState(int state) {
            this.state = state;
        }
        public Thread getLockHolder() {
            return lockHolder;
        }
        public void setLockHolder(Thread lockHolder) {
            this.lockHolder = lockHolder;
        }
    
        public void lock() {
            Thread currentThread = Thread.currentThread();
            if (acquire()) {
                return;
            }
            waiters.add(currentThread);
            // 自旋
            for (; ; ) {
                // 队列里第一个线程才能抢锁
                if (currentThread == waiters.peek() && acquire()) {
                    // 队列头线程拿到锁 踢出等待队列
                    waiters.poll();
                    return;
                }
                // 阻塞当前线程 放弃CPU使用权
                LockSupport.park();
            }
        }
    
        public void unLock() {
            if (Thread.currentThread() != lockHolder) {
                throw new RuntimeException("lockHolder is not current thread");
            }
            if (compareAndSwapState(getState(), 0)) {
                setLockHolder(null);
                // 唤醒队列里第一个线程
                Thread first = waiters.peek();
                if (first != null) {
                    LockSupport.unpark(first);
                }
            }
        }
    
        // 是否能加锁成功
        private boolean acquire() {
            Thread currentThread = Thread.currentThread();
            if (getState() == 0) { // 同步器尚未被持有
                // 没人排队/自己是队列头,才能去尝试原子操作改变state
                if ((waiters.size() == 0 || currentThread == waiters.peek()) && compareAndSwapState(0, 1)) {
                    setLockHolder(currentThread);
                    return true;
                }
            }
            return false;
        }
    
    
        // 利用Unsafe类实现原子操作改变值
        public final boolean compareAndSwapState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    
        private static final Unsafe unsafe = reflectGetUnsafe();
        // 偏移量
        private static final long stateOffset;
    
        static {
            try {
                stateOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
            } catch (Exception e) {
                throw new Error();
            }
        }
    
        // 反射获取Unsafe类
        private static Unsafe reflectGetUnsafe() {
            try {
                Field field = Unsafe.class.getDeclaredField("theUnsafe");
                field.setAccessible(true);
                return (Unsafe) field.get(null);
            } catch (Exception e) {
                return null;
            }
        }
    }
    
    public class TradeService {
        Logger logger = Logger.getLogger(TradeService.class);
        @Autowired
        JdbcTemplate jdbcTemplate;
        // 单例 创建锁对象
        MyLock myLock = new MyLock();
        public String decStock() {
            myLock.lock();  // 加锁
            Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
            if (stock <= 0) {
                logger.info("库存不足,下单失败!");
                myLock.unLock();    // 业务失败 释放锁
                return "库存不足,下单失败!";
            }
            stock--;
            jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
            logger.info("下单成功,当前剩余库存:" + stock);
            myLock.unLock();    // 业务成功 释放锁
            return "下单成功,当前剩余库存:" + stock;
        }
    }
    

    二、概述

    抽象同步框架,可以用来实现一个依赖状态的同步器

    1.可以实现独占与共享两种模式:

    • 独占:互斥,资源只能同时被一个线程占有,如ReentrantLock、Mutex

    • 共享:资源不互斥,可以被多个线程占有,如Semaphre、CountDownLatch

      一般来说同时只会实现一种模式,但也有ReentrantReadWriteLock同时实现独占和共享两种方式

    2.核心属性:

    • state:同步器的状态,即共享资源。访问方式为:getState()、setState()、compareAndSetState()

    • exclusiveOwnerThread:当前持有共享资源的线程

    • head:同步队列头

    • tail:同步队列尾

    3.特性:

    • 可中断

    • 可重入

    4.自定义AQS

    一般自定义同步器的时候,只需要自定义共享资源的获取和释放方式。至于等待队列的维护,AQS已经定义好了,不需要重写。所以主要重写以下几种方法:

    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

    • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    [图片上传失败...(image-ed6b0e-1649076723271)]

    三、源码

    waitStatus表示Node节点的等待状态

    waitStatus 判断结果 说明
    0 初始化状态 该节点尚未被初始化完成
    1 取消状态(CANCELLED) 说明该线程中断或者等待超时,需要移除该线程,进入该状态后节点不会变化
    -1 有效状态(SIGNAL) 下一个节点等着自己唤醒。节点入队会把前继节点状态更新为SIGNAL
    -2 有效状态(CONDITION) 结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    -3 有效状态(PROPAGATE) 共享模式下,自己不仅会唤醒后继节点,同时也可能唤醒后继节点的后继节点

    AQS中阻塞队列采用双向链表结构,使用prev、next连接;而条件队列采用单向链表,采用nextWaiter连接

    nextWaiter状态标志 说明
    SHARED(共享模式) = new Node() 立即唤醒下一个节点
    EXCLUSIVE(独占模式) = null 等待当前线程执行完再唤醒
    其他非空值 根据条件决定怎么唤醒下一个节点

    1.acquire()

    acquire()是获取共享资源的顶级入口,获取到资源则直接返回,否则入等待队列,直到获取到共享资源,这个过程忽略中断的影响(如果需要可中断,可以调用acquireInterruptibly())。获取到资源之后就可以执行临界区的代码了

       public final void acquire(int arg) {
            // 尝试获取共享资源
            if (!tryAcquire(arg) &&
                // 
                acquireQueued(
                    // 加入等待队列尾部
                    addWaiter(Node.EXCLUSIVE), arg)
               )
                // 如果在acquireQueued()被中断过 这里自己补一个中断 
                selfInterrupt();
        }
    

    1.1tryAcquire()

    AQS中,这个方法不能被直接调用,需要子类重写。这里不定义为抽象方法的好处在于,独占模式下只需重写tryAcquire(),共享模式写重写tryAcquireShared(),如果定义为抽象方法则都需要实现。

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    以ReentrantLock为例,非公平锁中中,会直接CAS尝试获取共享资源,公平锁中,会先检查队列中有没有正在等待的线程,才去获取共享资源

    1.2addWaiter()

    如果tryAcquire()加锁失败,会addWaiter(Node.EXCLUSIVE), arg),创建一个独占模式的节点加入到队列尾部

       private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
    
            // 尝试快速入队
            // 这里直接进行了一次CAS加到尾部的尝试,失败才去自旋 为什么要这样呢?直接调用enq(node)效果似乎也一样?
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
           // 自旋 空队列则CAS地初始化队列,队列有节点就把节点挂到队列尾部
            enq(node);
            return node;
        }
    

    1.3acquireQueued()

    把当前节点加入队列尾部之后,acquireQueued()

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 获取自己的前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点是头结点(正持有资源的节点) 自己是第二个节点,那么可能有机会获得资源 
                    // 尝试获取资源(非公平锁直接获取,公平锁会先检查队列中有没有线程在等待再获取)
                    // 第一次进来就直接看能不能获取资源 也可能是等待后,被唤醒(前驱节点释放资源/中断)循环走到这里
                    if (p == head && tryAcquire(arg)) {
                        // 获取到资源 把当前节点设为头结点,前驱节点置空,保存的线程置空(头结点没必要保存线程了)
                        setHead(node);
                        // 去掉之前的头节点
                        p.next = null; // help GC
                        failed = false;
                        
                        // 返回等待过程中有没有被中断过
                        return interrupted;
                    }
                    // 如果自己不是第二个节点 或者自己虽然是第二个节点,但是由于非公平锁,新来的线程可以不入队列,直接获取锁,所以这里可能被其他线程抢先了
                    // 检查是否可以休息,找到可以安心休息的地方。将前驱节点waitStatus改为-1,即后续来唤醒自己
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // park 阻塞等待
                        // 检查中断标志 如果是被中断了而不是被unPark()
                        // 后面tryAcquire方法()获取到资源之后会返回中断标志,acquire()里自行产生一个中断标志
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    尝试获取资源失败后,检查自己是否可以安心休息,如果不能就找到一个可以安心休息的地方

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
             // 前驱节点的状态已经是SIGNAL,到时候会来唤醒自己,所以可以安心休息
            if (ws == Node.SIGNAL)
                return true;
            // 前驱节点状态是被取消 
            if (ws > 0) {
                do {
                    // 一直往前找没有取消的节点
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                // 把自己挂到它后面
                pred.next = node;
            } else {
                // 走到这里意味着状态是0/PROPAGATE 则把前驱节点状态置为SIGNAL 
                // 这里就算成功了,也不会直接返回true开始休息,而是再一轮尝试获取资源,获取不到再park
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
        private final boolean parkAndCheckInterrupt() {
            // 等待
            LockSupport.park(this);
            // 被唤醒 返回自己是不是因为中断被唤醒  中断信号能打断一个等待中的线程,终止等待
            return Thread.interrupted();
        }
    

    总结:如果当前是第二个节点,就去尝试获取资源,如果成功了直接返回。否则的话,寻找一个合适的休息点,开始等待,直到被前驱节点unPark()或被中断,才继续判断自己是否是第二个节点,去尝试获取资源。这个方法会返回等待过程中是否被中断过,后续用来自行产生一个中断。这也是不响应中断的核心,等待过程不可中断,只有获取到资源之后,才可以被中断

    1.4selfInterrupt()

    自行产生一个中断

    1.5总结

    1.先尝试获取资源

    2.获取不到则把当前线程加入队列尾,标记独占模式

    3.检查自己是否为第二个节点,是则尝试获取锁,不是则进入等待,等轮到自己了,前驱节点会unPark自己,自己再去尝试获取资源。如果被中断过,则返回出去

    4.获取资源成功,如果被中断过,则产生一个中断

    2.release()

    释放资源的顶级入口,释放完资源,会唤醒队列中后一个正在等待的线程

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                // 判断头节点不为空,状态不为0
                if (h != null && h.waitStatus != 0)
                    // 唤醒后继节点 找到队列里离head最近的一个没取消的node,unpark恢复其运行
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    2.1tryRelease()

        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    

    2.2unparkSuccessor()

       private void unparkSuccessor(Node node) {
            
            int ws = node.waitStatus;
            // 当前结点状态为负 则置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            // 一般要唤醒的就是后一个节点,但是可能该节点已取消 所以要从尾结点一直往前找,找到真正有效的节点
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    疑问:这里为什么要从后往前找?

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                // 要挂上去的节点 prev是一定能成功指向之前的尾结点的
                node.prev = pred;
                // CAS把尾结点设置为当前节点
                if (compareAndSetTail(pred, node)) {
                    // 但这里可能还没有执行到 此时之前的尾结点指向null
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
    
            node.thread = null;
    
            // Skip cancelled predecessors
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED;
    
            // If we are the tail, remove ourselves.
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    

    3.acquireShared()

    共享模式下,尝试获取指定量的资源,获取失败则进入等待队列,直到获取到资源。该过程忽略中断

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

    3.1tryAcquireShared()

        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    负值:失败,需要执行doAcquireShared()进入队列

    0:成功,但没有剩余资源

    正值:成功,还有剩余资源,其他线程还可以获取

    3.2doAcquireShared()

        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            // 自己成为head
            setHead(node);
     
            // 如果还有剩余量 继续唤醒下一个线程
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    3.3总结

    先尝试获取资源,没有获取到则进入队列等待,直到获取到资源。与独占模式相比,自己拿到资源之后,还会继续唤醒下一个线程

    4.releaseShared()

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//尝试释放资源
            doReleaseShared();//唤醒后继结点
            return true;
        }
        return false;
    }
    

    4.1tryReleaseShared()

        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    4.2doReleaseShared()

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);//唤醒后继
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)// head发生变化
                break;
        }
    }
    

    相关文章

      网友评论

          本文标题:AQS源码分析

          本文链接:https://www.haomeiwen.com/subject/dhqhsrtx.html