AQS

作者: sizuoyi00 | 来源:发表于2019-12-01 23:17 被阅读0次

    AbstractQueuedSynchronizer(抽象队列化同步器)简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。如ReentrantLock, CountdowLatch 都是基于 AQS 实现的。

    AQS特性
    阻塞等待队列、共享/独占、公平/非公平、可重入、可中断

    1.AQS内部结构

    state 同步资源状态-通过加锁的次数
    两种队列:CLH等待队列、Condition条件队列
    两种模式:独占、共享

       /**
         * 指向同步等待队列的头节点
         */
        private transient volatile Node head;
    
        /**
         * 指向同步等待队列的尾节点
         */
        private transient volatile Node tail;
    
        /**
         * 同步资源状态
         * 通过加锁的次数
         *
         * state三种访问方式  getState()、setState()、compareAndSetState()
         */
        private volatile int state;
    
    
        /**
         * Wait queue node class. 等待队列结点类
         *
         * AQS定义两种队列
         *  CLH等待队列  独占/共享模式 双向队列  next/prev节点正常指向前驱/后继
         *  原CLH队列的一个变种,线程由原自旋机制改为阻塞机制
         *
         *  Condition条件队列  独占模式 单向队列  next/prev节点都为空 通过nextWaiter获取下一节点
         *  条件队列一般使用场景:阻塞队列
         * AQS定义两种模式
         *  共享、独占
         * 
         */
        static final class Node {
            /**
             * 当前结点-共享模式
             * 多个线程可以同时执行,如Semaphore/CountDownLatch
             * */
            static final Node SHARED = new Node();
            /**
             *  当前结点-独占模式
             *  只有一个线程能执行,如ReentrantLock
             */
            static final Node EXCLUSIVE = null;
    
            /**
             * 在CLH等待队列中 等待的线程 超时/中断,不可被唤醒,不可获取锁,需要移除
             */
            static final int CANCELLED =  1;
            /**
             *  在CLH等待队列中 等待的线程 正常等待状态,可被唤醒,可获取锁
             */
            static final int SIGNAL    = -1;
            /**
             *  在Condition条件队列中,当其他线程对Condition调用了signal()方法后,
             *  该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
             */
            static final int CONDITION = -2;
            /**
             * 在CLH等待队列中 等待的线程都是共享模式,所以等待线程都可以被传播去获取锁
             */
            static final int PROPAGATE = -3;
    
            /**
             * 当前节点的信号量状态 ,记录上边的(1,0,-1,-2,-3)5种状态
             * 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
             * 即被一个线程修改后,状态会立马让其他线程可见。
             */
            volatile int waitStatus;
    
            /**
             * 前驱节点,当前节点加入到同步队列中被设置
             */
            volatile Node prev;
    
            /**
             * 后继节点
             */
            volatile Node next;
    
            /**
             * 当前节点对应记录的线程
             */
            volatile Thread thread;
    
            /**
             * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,
             * 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。
             */
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * 返回前驱节点
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
        
    

    2.获取锁acquire()方法

    Reentrant.lock调用AQS.acquire获取锁

        /**
         * 独占模式获取锁。
         * 通过至少一次调用tryAcquire实现成功获取锁。否则线程将加入CLH等待队列进行阻塞,
         * 直到当前占用锁的线程执行完毕释放锁,也就是调用unLock的时候当前线程才会被唤醒
         */
        public final void acquire(int arg) {
            // tryAcquire子类实现,详见ReentrantLock
            // 获取锁失败,加入CLH等待队列,阻塞
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    

    addWaiter方法解析

       /**
         * 为当前线程和给定模式,创建线程结点,加入CLH队列。
         */
        private Node addWaiter(Node mode) {
            // 将当前线程构建成Node结点
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            // 队列尾节点 作为当前节点的前置节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                // CAS将节点set到队列尾部
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 尾节点为null,加入CLH队列
            enq(node);
            return node;
        }
    
        /**
         * 节点加入CLH同步队列尾部,可能需要初始化
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    //尾结点为空需要初始化,set空的头节点
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    //尾结点不为空,获取当前尾结点作为前继节点
                    node.prev = t;
                    //当前节点set为尾部
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    acquireQueued方法解析

        /**
         * 已经在队列当中的Thread节点,准备阻塞等待获取锁
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 前驱节点
                    final Node p = node.predecessor();
                    // 前驱结点为头结点,再进行tryAcquire
                    if (p == head && tryAcquire(arg)) {
                        //获取锁成功,将当前结点设置为头结点。
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        // 只有这里有返回值
                        return interrupted;
                    }
                    /**
                     * 如果前驱节点不是Head,判断是否应该阻塞
                     * 当前驱节点信号量为SIGNAL=-1,当前线程可以被阻塞
                     * parkAndCheckInterrupt阻塞当前线程,直到其他线程释放锁
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        /**
         * 检查并更新获取失败的结点
         * SIGNAL结点可被阻塞
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                // 若前驱结点的状态是SIGNAL,意味着当前结点可以被阻塞
                //一般都是从waitStatus=0,先转换为-1,通过自旋,第二次才会走到这里
                return true;
            if (ws > 0) {
                // 前驱节点状态如果被取消状态,将被移除出队列
                do {
                    // 移除出队列
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 当前驱节点waitStatus为 0 or PROPAGATE状态时
                // 将其设置为SIGNAL状态,当前结点后续可以被安全地park
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
        /**
         * 阻塞当前节点,返回当前Thread的中断状态
         * LockSupport.park 底层通过Unsafe类实现
         * 调用系统内核功能 pthread_mutex_lock 阻塞线程
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);//阻塞
            return Thread.interrupted();
        }
    
    

    3.

    ReentrantLock.tryLock(long timeout, TimeUnit unit)调用AQS.tryAcquireNanos获取锁

        /**
         * 限定时间内尝试获取锁
         */
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 加锁 || 限定时间内加锁
            return tryAcquire(arg) ||
                    doAcquireNanos(arg, nanosTimeout);
        }
    
         /**
         * 独占模式定时获取
         */
        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            //加入队列 独占模式
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                //限定时间内自旋尝试获取锁
                for (;;) {
                    final Node p = node.predecessor();
                    // 前驱结点为头结点,再进行tryAcquire
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        //超时直接返回获取失败
                        return false;
                    /**
                     * 如果前驱节点不是Head,判断是否应该阻塞
                     * 当前驱节点信号量为SIGNAL=-1,当前线程可以被阻塞
                     * 这里与上边不同,指定时间阻塞,超时自动唤醒
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            nanosTimeout > spinForTimeoutThreshold)
                        //阻塞指定时长,超时则线程自动被唤醒
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    

    4.唤醒节点

        /**
         * 释放独占模式持有的锁
         */
        public final boolean release(int arg) {
            //释放一次锁 详见ReentrantLock.tryRelease
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    唤醒阻塞线程,每次只唤醒一个
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        
        /**
         * 找到后继节点阻塞状态的线程进行唤醒
         * 每次只唤醒“一个”
         */
        private void unparkSuccessor(Node node) {
      
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            //若后继结点为空,或状态为CANCEL(已失效),
            //则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点进行唤醒
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                //唤醒节点
                LockSupport.unpark(s.thread);
        }
    

    相关文章

      网友评论

          本文标题:AQS

          本文链接:https://www.haomeiwen.com/subject/vemzwctx.html