Java AQS

作者: 折剑游侠 | 来源:发表于2021-09-23 18:44 被阅读0次

    AQS直译为抽象同步队列,其内部封装了锁的状态,维护了请求锁的线程相关的数据结构。锁的状态只有两种,被持有或者释放。AQS如何维护这两种状态呢?通过其内部volatile变量state,state本身被volatile修饰,相关的三个方法getState()setState()compareAndSetState()又是原子操作,保证了其多线程操作下的安全问题。state = 0表示锁被释放或者说无任何线程持有;state > 0表示锁已经被某个线程持有。

        private volatile int state;
    
        protected final int getState() {
            return state;
        }
    
        protected final void setState(int newState) {
            state = newState;
        }
    
        protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    那么问题来了,某个线程持有了锁,某个是哪个?也就是说state从0变为1的时候需要设置类似CurrentThread这样的变量来标识当前持有锁的线程。

    AQS父类AbstractOwnableSynchronizer

        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    

    当state已经大于1也就是说某个线程持有了锁,这时有其它线程进来请求获取锁,当然是排队啦。AQS毕竟就叫队列,其内部维护的数据结构是双向链表。

    AQS.Node

    static final class Node {
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
            Node nextWaiter;
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {
            }
    
            Node(Thread thread, Node mode) { 
                this.nextWaiter = mode;
                this.thread = thread;
            }
        }
    

    回过头来粗略总结一下,Lock.lock()请求锁就是把state值从0改为1并记录当前Thread为持有锁的线程exclusiveOwnerThread,如果state已经为1,将当前线程添加到双向链表表尾。

    这么一说可能还是有点懵逼,结合Lock接口的实现类ReentrantLock请求锁lock()释放锁unlock()的API调用链看一遍流程就知道了。

    ReentrantLock.lock()

        private final Sync sync;
    
        public void lock() {
            sync.lock();
        }
    

    Sync继承AQS实现了锁相关的功能,下面看Sync.lock()

        abstract void lock();
    

    抽象方法,子类实现。很多同学都知道ReentrantLock有公平锁和非公平锁的实现,这里先看非公平锁的实现NonfairSync.lock()

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    调用AQS.compareAndSetState(0, 1)方法将state从0改为1。设置成功标识持有锁的线程setExclusiveOwnerThread;设置失败调用AQS.acquire()

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    tryAcquire是空实现,默认抛出异常。非公平锁NonfairSync重写了该方法,继续看NonfairSync.nonfairTryAcquire()

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    判断state是否为0:

    • 0->cas修改state为1,修改成功标识CurrentThread。
    • 大于0->判断当前线程是否已经获取了锁,已经获取继续+1以此实现可重入锁。

    其它情况下tryAcquire()方法都返回false,回看AQS.acquire();tryAcquire()返回false时调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    AQS.addWaiter()

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    创建一个线程节点Node放到双向链表表尾,enq()自旋直到节点成功添加到表尾并处理了空表的情况。

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) {
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    AQS.acquireQueued()

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    不必深究细节,须知此处线程入队后自旋获取锁,获取成功将当前线程置为头结点,返回线程是否被中断过interrupted

    看一下ReentrantLock公平锁实现有何不同FairSync.tryAcquire()

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    cas多了一个!hasQueuedPredecessors()条件

        public final boolean hasQueuedPredecessors() {
            Node t = tail; 
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    判断获取锁的线程是否是头结点以此实现公平策略,所以公平非公平,其实AQS内的双向链表已经维护好了,取头结点判断即可。

    接下来看释放锁ReentrantLock.unlock()

        public void unlock() {
            sync.release(1);
        }
    

    sync.release(1)调用到父类AQS.release()

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    

    AQS.tryRelease()调用到子类实现Sync.tryRelease()

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    获取当前state值并-1,Check释放锁的线程是否是持有锁的线程。state为0表示锁已被释放,currentThread同步置为null;state不为0重新设置state。锁释放成功后唤醒队列中等待的线程获取资源。

    上述为独享锁的流程,实际上AQS还有对应实现共享锁的方法tryAcquireShared()tryReleaseShared()。以共享锁CountDownLatch为例,CountDownLatch将任务拆分为N个子线程执行,state对应初始化为N。N个子线程并行执行,子线程任务执行完后都会调用countDown(),此时state通过cas减1。当所有子线程执行完毕也就是state为0时,unpark()主线程,主线程从await()函数返回。

    说实话这玩意我没用过,看一下官方示例:

     class Driver2 { // ...
       void main() throws InterruptedException {
         CountDownLatch doneSignal = new CountDownLatch(N);
         Executor e = ...
    
         for (int i = 0; i < N; ++i) // create and start threads
           e.execute(new WorkerRunnable(doneSignal, i));
    
         doneSignal.await();           // wait for all to finish
       }
     }
    
     class WorkerRunnable implements Runnable {
       private final CountDownLatch doneSignal;
       private final int i;
       WorkerRunnable(CountDownLatch doneSignal, int i) {
         this.doneSignal = doneSignal;
         this.i = i;
       }
       public void run() {
         try {
           doWork(i);
           doneSignal.countDown();
         } catch (InterruptedException ex) {} // return;
       }
    
       void doWork() { ... }
     }
    

    state初始化为线程数量,CountDownLatch构造方法

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

    和ReentrantLock一样,锁的实现交给了内部类Sync,调用AQS.setState()初始化state值

    private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    CountDownLatch.countDown()

        public void countDown() {
            sync.releaseShared(1);
        }
    

    调用父类AQS.releaseShared()

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    回到子类实现Sync.tryReleaseShared()

            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

    自旋,cas操作state值减1,tryReleaseShared()返回值->操作后的state是否为0。如果为0,表示所有子线程任务都执行完毕,回到AQS.doReleaseShared()

        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    唤醒队列中等待的节点,继续看CountDownLatch.await()

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

    调用父类AQS.acquireSharedInterruptibly()

        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    回到子类实现Sync.tryAcquireShared()

            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    当所有子线程执行完毕state为0时会走到AQS.doAcquireSharedInterruptibly()

        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    将当前线程也就是执行await()的线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,也就是state为0时调用的AQS.doReleaseShared()

    独享锁或共享锁只需对应实现AQS的两组(tryAcquire()、tryRelease(),tryAcquireShared()、tryReleaseShared())方法实现state控制。

    相关文章

      网友评论

          本文标题:Java AQS

          本文链接:https://www.haomeiwen.com/subject/mvwngltx.html