美文网首页
AQS源码和原理分析

AQS源码和原理分析

作者: 无聊之园 | 来源:发表于2019-04-30 11:42 被阅读0次

    AbstractQueuedSynchronizer的父类是AbstractOwnableSynchronizer。 AbstractOwnableSynchronizer类功能很单一:只是set和get独占资源线程。什么是独占独占资源线程?比如:ReentrantLock可重入锁的第一个调用lock方法的线程,这个线程锁住资源,其他线程调用lock方法只能等待,那么这个线程就是独占资源线程。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
    
    image.png

    AQS的大概原理和流程:

    维护了一个state变量和node双向fifo链表。
         已获取资源占有许可的数量。比如:如果线程调用acquire(1)请求资源许可,acquire会调用一次tryAcquire(1)尝试获取资源。
         如果获取成功:则state加1,并调用AQS的父类AbstractOwnableSynchronizer的设置独占线程,把当前独占线程设置当前线程。
         如果调用失败:则说明,前面已经有线程占用了这个资源,需要等待的线程释放。则把当前线程封装成node节点,放入node双向链表,之后Locksupport.pack()堵塞当前线程。加入这个线程堵塞后被唤醒,则继续循环调用tryAcquire方法获取资源许可,获取到了,则把自身node节点设置为node链表的头节点,把之前的头节点去掉。

         如果线程释放资源,调用release方法,release方法会调用tryRelease方法尝试释放资源,如果释放成功,则state减1,再调用AQS的父类AbstractOwnableSynchronizer的设置独占线程为null,再locksupport.unpack()双向node链表的头node节点的线程,恢复其执行。

    维护了一个condition单向链表。
    condition是一个接口,AQS的内部类conditionObject实现了这个接口。这个接口的作用类似于实现java Object的wait方法,notify方法。当然condition是内部的数据结构进行Locksupport.pack和unpack来实现的,而且提供了超时机制。
    condition并不是AQS维护,只有当需要await和signal机制,通过new ConditionObject()生成了这个对象,调用这个对象的方法进行使用,AQS本身不存在这个对象的引用,condition单链表也是conditionObject内部类本身在维护,而不是AQS在维护。
    调用condition.await()方法,意味着当前线程进入等待状态。将当前节点包装成node节点,放入condition链表的尾部。然后调用AQS的release方法,释放state,locksupport.unpack()双向node链表的头结点线程。之后,再将自身线程堵塞。
    调用condition.signal()方法,意味着唤醒其他线程调用condition.await()方法进入等待状态的线程。会将conditionObject维护的node单向链表的头节点,移动到AQS维护的双向node节点的尾部,等待其他线程使用完资源后调用release方法一个一个唤醒。

    看关键代码

    先看成员变量

    private transient volatile Node head;
    
        /**
         * Tail of the wait queue, lazily initialized.  Modified only via
         * method enq to add new wait node.
         */
        private transient volatile Node tail;
    
        /**
         * The synchronization state.
         */
    // 维护的state变量
        private volatile int state;
     // cas的unsafe工具类
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state变量的内存偏移量,用于unsafe的cas操作
     private static final long stateOffset;
      // node双向链表的head的内存偏移量,用于unsafe的cas操作
        private static final long headOffset;
    // node双向链表的tail的内存偏移量,用于unsafe的cas操作
        private static final long tailOffset;
    // node节点的waitStatus的内存偏移量, 用于unsafe的cas操作
        private static final long waitStatusOffset;
    // node节点的next的内存偏移量, 用于unsafe的cas操作
        private static final long nextOffset;
    // 静态代码快,初始化这些变量。 
    static {
            try {
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
    

    可见,aqs的成员变量,只是维护了state变量,和node双向链表,并没有维护condition相关的链表。

    看AQS的内部类Node

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     The successor of this node is (or will soon be)
             *               blocked (via park), so the current node must
             *               unpark its successor when it releases or
             *               cancels. To avoid races, acquire methods must
             *               first indicate they need a signal,
             *               then retry the atomic acquire, and then,
             *               on failure, block.
             *   CANCELLED:  This node is cancelled due to timeout or interrupt.
             *               Nodes never leave this state. In particular,
             *               a thread with cancelled node never again blocks.
             *   CONDITION:  This node is currently on a condition queue.
             *               It will not be used as a sync queue node
             *               until transferred, at which time the status
             *               will be set to 0. (Use of this value here has
             *               nothing to do with the other uses of the
             *               field, but simplifies mechanics.)
             *   PROPAGATE:  A releaseShared should be propagated to other
             *               nodes. This is set (for head node only) in
             *               doReleaseShared to ensure propagation
             *               continues, even if other operations have
             *               since intervened.
             *   0:          None of the above
             *
             * The values are arranged numerically to simplify use.
             * Non-negative values mean that a node doesn't need to
             * signal. So, most code doesn't need to check for particular
             * values, just for sign.
             *
             * The field is initialized to 0 for normal sync nodes, and
             * CONDITION for condition nodes.  It is modified using CAS
             * (or when possible, unconditional volatile writes).
             */
            volatile int waitStatus;
    
            /**
             * Link to predecessor node that current node/thread relies on
             * for checking waitStatus. Assigned during enqueuing, and nulled
             * out (for sake of GC) only upon dequeuing.  Also, upon
             * cancellation of a predecessor, we short-circuit while
             * finding a non-cancelled one, which will always exist
             * because the head node is never cancelled: A node becomes
             * head only as a result of successful acquire. A
             * cancelled thread never succeeds in acquiring, and a thread only
             * cancels itself, not any other node.
             */
            volatile Node prev;
    
            /**
             * Link to the successor node that the current node/thread
             * unparks upon release. Assigned during enqueuing, adjusted
             * when bypassing cancelled predecessors, and nulled out (for
             * sake of GC) when dequeued.  The enq operation does not
             * assign next field of a predecessor until after attachment,
             * so seeing a null next field does not necessarily mean that
             * node is at end of queue. However, if a next field appears
             * to be null, we can scan prev's from the tail to
             * double-check.  The next field of cancelled nodes is set to
             * point to the node itself instead of null, to make life
             * easier for isOnSyncQueue.
             */
            volatile Node next;
    
            /**
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;
    
            /**
             * Link to next node waiting on condition, or the special
             * value SHARED.  Because condition queues are accessed only
             * when holding in exclusive mode, we just need a simple
             * linked queue to hold nodes while they are waiting on
             * conditions. They are then transferred to the queue to
             * re-acquire. And because conditions can only be exclusive,
             * we save a field by using special value to indicate shared
             * mode.
             */
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    node类,包装了线程,双向链表的前后引用,节点的几种状态和当前节点的状态。
    cancelled:代表取消,和线程有关,暂时不考虑线程中断。
    signal:代表此节点的next节点可以被唤醒,node的双链表,除了第一次构造node双链表的时候head节点为逻辑null节点,其他时候,head节点都是正在运行的线程所对应的节点,唤醒的也都是head节点的next节点。
    condition:代表是调用condition.signal方法调用之后,从condition单向node链表中移过来的节点。也是可以唤醒的节点。

    关键方法

    await方法

    // acquire方法获取资源占有权
     public final void acquire(int arg) {
         /** 尝试获取,tryAcquire方法是子类必须实现的方法,
    * 比如公平锁和非公平锁的不同就在于tryAcquire方法的实现的不同。
    * 获取失败,则addWaiter方法,包装node节点,放入node双向链表。再acquireQueued堵塞线程,循环获取资源占有权。
    */   
    if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
    private Node addWaiter(Node mode) {
          // 新构建的node节点,waitStatus初始值为0
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
             // 如果尾部不为空,则说明node双向链表之前已经被初始化了,那么直接把新node节点加入尾部
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
          // 如果尾部为null,则说明node双向链表之前没有被初始化,则,调用enq方法,初始化node双向链表,并且把新节点加入尾部
            enq(node);
            return node;
        }
    
    private Node enq(final Node node) {
        // 循环自旋cas,防止其他线程已经初始化了这个链表  
          for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    // 初始化的链表,头节点是一个逻辑意义为null的节点,waitStatus为初始值0。
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    /** 双向node链表是fifo特性的,所以,
                    * 能够获取资源权限的节点,
                    * 其前一个节点必须是头部节点(头部节点就是正在占有资源的线程,              
                   * 是其调用release方法让出资源)
                    */
                    if (p == head && tryAcquire(arg)) {
                      // 把之前的头节点去掉,新获取到资源权限的节点设置为新头节点
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    /* 是否应该堵塞线程,当前节点的前面一个节点p的waitStatus为      
                    * signal,则堵塞,如果p的waitStatus没有初始化,为0,或者为  
                    * PROPAGATE,* 则更改为signal,继续循环。
                    * parkAndCheckInterrupt方法堵塞当前线程
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    acquire方法总结:如果获取成功:则state加1,并调用AQS的父类AbstractOwnableSynchronizer的设置独占线程,把当前独占线程设置当前线程。
    如果调用失败:则说明,前面已经有线程占用了这个资源,需要等待的线程释放。则把当前线程封装成node节点,放入node双向链表,之后Locksupport.pack()堵塞当前线程。加入这个线程堵塞后被唤醒,则继续循环调用tryAcquire方法获取资源许可,获取到了,则把自身node节点设置为node链表的头节点,把之前的头节点去掉。
    node节点的waitStatus为signal,则意味这其next节点可以被唤醒。

    release方法

     public final boolean release(int arg) {
         // 调用tryRelease方法尝试释放资源,释放成功,则state减arg。然后locksupport.unpack唤醒双向node链表的头部第二个节点。
    //  tryRelease也是必须要子类实现的方法
         if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
    private void unparkSuccessor(Node node) {
           /*
            * If status is negative (i.e., possibly needing signal) try
            * to clear in anticipation of signalling.  It is OK if this
            * fails or if status is changed by waiting thread.
            */
           int ws = node.waitStatus;
         // node的waitStatus为signal或condition,则可以唤醒,先重置node的waitStatus为0
         if (ws < 0)
               compareAndSetWaitStatus(node, ws, 0);
    
           /*
            * Thread to unpark is held in successor, which is normally
            * just the next node.  But if cancelled or apparently null,
            * traverse backwards from tail to find the actual
            * non-cancelled successor.
            */
           Node s = node.next;
           if (s == null || s.waitStatus > 0) {
               s = null;
               for (Node t = tail; t != null && t != node; t = t.prev)
                   if (t.waitStatus <= 0)
                       s = t;
           }
           // 头部第二个节点非null,则唤醒这个节点
           if (s != null)
               LockSupport.unpark(s.thread);
       }
    

    release方法总结:如果线程释放资源,调用release方法,release方法会调用tryRelease方法尝试释放资源,如果释放成功,tryRelease方法会将state减1,再调用AQS的父类AbstractOwnableSynchronizer的设置独占线程为null,再locksupport.unpack()双向node链表的头node节点的线程,恢复其执行。

    再研究AQS的内部类ConditionObject

    看成员变量

    public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
    

    conditionObject本身维护condition的单向链表。

    看关键方法

    await方法

    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 把线程封装成node节点,放入condition单向列表
                Node node = addConditionWaiter();
                // fullRelease方法会调用AQS的release方法,释放资源占有权,恢复node双向链表的头部第二个节点运行。
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                 // 当前节点不再sync队列中(就是AQS维护的双向node链表),意味着,之前唤醒的线程已经开始运行,并且把自己作为了新头节点,把这个线程对应的头节点剔除了。
                // 不再sync队列,则堵塞线程
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    // 把线程封装成node节点,放入condition单链表
    private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                // 这个节点的waitStatus为condition
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
    final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                // 释放所有许可,唤醒node双链表头部第二个线程
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    await方法总结:调用condition.await()方法,意味着当前线程进入等待状态。将当前节点包装成node节点,放入condition链表的尾部。然后调用AQS的release方法,释放state,locksupport.unpack()双向node链表的头结点线程。之后,再将自身线程堵塞。
    再看signal方法

    public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
            // 如果condition链表的第一个节点不为null,则唤醒    
            if (first != null)
                    doSignal(first);
            }
    
    // transferForSignal
     private void doSignal(Node first) {
                do {
                    // 移除头节点
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                  // transferForSignal,将头节点移入sync链表,也就是aqs维护的双向node列表,等待aqs的release方法的依次唤醒。
              // 如果transferForSignal成功,则跳出循环,否则,一直循环头部的下一个节点
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
          // 如果这个节点的waitStatus不是condition,(比如,被concelled了),则跳出来 
           if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            // 加入aqs维护的双向node链表
            Node p = enq(node);
            int ws = p.waitStatus;
            // 线程被打断
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    signal方法总结:调用condition.signal()方法,意味着唤醒其他线程调用condition.await()方法进入等待状态的线程。会将conditionObject维护的node单向链表的头节点,移动到AQS维护的双向node节点的尾部,等待其他线程使用完资源后调用release方法一个一个唤醒。

    相关文章

      网友评论

          本文标题:AQS源码和原理分析

          本文链接:https://www.haomeiwen.com/subject/oigenqtx.html