AQS

作者: imyaa | 来源:发表于2018-07-14 21:33 被阅读0次

    paper传送门

    /**
    
    * Provides a framework for implementing blocking locks and related
    
    * synchronizers (semaphores, events, etc) that rely on
    
    * first-in-first-out (FIFO) wait queues.  This class is designed to
    
    * be a useful basis for most kinds of synchronizers that rely on a
    
    * single atomic int value to represent state. Subclasses
    
    * must define the protected methods that change this state, and which
    
    * define what that state means in terms of this object being acquired
    
    * or released.  Given these, the other methods in this class carry
    
    * out all queuing and blocking mechanics. Subclasses can maintain
    
    * other state fields, but only the atomically updated int
    
    * value manipulated using methods {@link #getState}, {@link
    
    * #setState} and {@link #compareAndSetState} is tracked with respect
    
    * to synchronization.
    
    */
    
    *      +------+  prev +-----+       +-----+
    
    * head |      | <---- |     | <---- |     |  tail
    
    *      +------+       +-----+       +-----+
    

    在AQS中维护着一个FIFO的同步队列,当线程获取同步状态失败后,则会加入到这个CLH同步队列的对尾并一直保持着自旋。在CLH同步队列中的线程在自旋时会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出CLH同步队列。当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。

    它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

    getState()
    
    setState()
    
    compareAndSetState()
    

    AQS定义两种资源共享方式:

    • Exclusive(独占,只有一个线程能执行,如ReentrantLock)
    • Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

    ReentrantLock的公平和非公平如何实现的呢?它内部有几个类

    abstract static class Sync extends AbstractQueuedSynchronizer
    
    static final class NonfairSync extends Sync
    
    static final class FairSync extends Sync
    

    对于State都是tryAcquire方法

    公平锁

    static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    }
    

    非公平锁

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
    /**
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    可以发现区别在于公平锁多了一层hasQueuedPredecessors()判断

    /**
         * Queries whether any threads have been waiting to acquire longer
         * than the current thread.
         *
         * <p>An invocation of this method is equivalent to (but may be
         * more efficient than):
         *  <pre> {@code
         * getFirstQueuedThread() != Thread.currentThread() &&
         * hasQueuedThreads()}</pre>
         *
         * <p>Note that because cancellations due to interrupts and
         * timeouts may occur at any time, a {@code true} return does not
         * guarantee that some other thread will acquire before the current
         * thread.  Likewise, it is possible for another thread to win a
         * race to enqueue after this method has returned {@code false},
         * due to the queue being empty.
         *
         * <p>This method is designed to be used by a fair synchronizer to
         * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
         * Such a synchronizer's {@link #tryAcquire} method should return
         * {@code false}, and its {@link #tryAcquireShared} method should
         * return a negative value, if this method returns {@code true}
         * (unless this is a reentrant acquire).  For example, the {@code
         * tryAcquire} method for a fair, reentrant, exclusive mode
         * synchronizer might look like this:
         *
         *  <pre> {@code
         * protected boolean tryAcquire(int arg) {
         *   if (isHeldExclusively()) {
         *     // A reentrant acquire; increment hold count
         *     return true;
         *   } else if (hasQueuedPredecessors()) {
         *     return false;
         *   } else {
         *     // try to acquire normally
         *   }
         * }}</pre>
         *
         * @return {@code true} if there is a queued thread preceding the
         *         current thread, and {@code false} if the current thread
         *         is at the head of the queue or the queue is empty
         * @since 1.7
         */
        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    doc 和 代码都很清楚, 此方法判断当前线程之前是否还有其他线程在(非头结点)

    相关文章

      网友评论

          本文标题:AQS

          本文链接:https://www.haomeiwen.com/subject/lwfupftx.html