美文网首页
AQS源码分析

AQS源码分析

作者: userheng | 来源:发表于2020-07-08 17:59 被阅读0次

    一、通过ReentrantLock来分析AbstractQueuedSynchronizer源码

    //初始化一个公平锁
    ReentrantLock lock = new ReentrantLock(true);
    

    加锁lock

    //java.util.concurrent.locks.ReentrantLock.FairSync
        final void lock() {
                acquire(1);
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
      
    
    1. 线程尝试获取锁 tryAcquire
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    列出一些AQS中的对象属性域

    //If head exists, its waitStatus is guaranteed not to be CANCELLED
     private transient volatile Node head;
     private transient volatile Node tail;
     private volatile int state;
     private Thread exclusiveOwnerThread;
    
    若state == 0;
          若等待队列中没有线程排队,compareAndSetState(0, acquires)成功;
                setExclusiveOwnerThread(current),返回true。表示加锁成功;
    若current == getExclusiveOwnerThread();
          state += acquires,更新state,返回true。表示锁重入成功
    

    判断等待队列中是否有线程排队。若 head == tail 说明等待队列没有线线程入队;若 head != tail,head.next != null,当前的线程与head.next中的线程不同,说明等待队列中有线程在等待;若 head != tail,head.next == null ,说明等待队列中有线程在等待。(这种情况怎么产生的?)

    1. tryAcquired失败,创建node并加入等待队列 addWaiter
    //return the new node
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
    //return the node's predecessor
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    列出一些Node中的对象属性域

    //The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes.
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    
    最初等待队列未初始化,head和tail都为null,通过CAS竞争某个线程会将head和tail初始化为一个空Node;
    不断尝试让当前Node成为tail,并返回此Node;
    
    queue.png
    1. 线程再次尝试获取锁或者Park
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    若当前节点是等待队列中的第一个节点,若尝试获取锁成功,设置head为当前Node,返回中断状态;
    判断是否需要Park,若需要则Park当前线程;
    

    判断当前线程是否需要Park,总是检查前一个Node的waitStatus是否为-1,一般Node初始化时设置为0。所以Park当前线程总会经历这样一个流程,将前一个Node的waitStatus设置为-1,然后Park当前线程。

    解锁release

    //java.util.concurrent.locks.ReentrantLock.FairSync
      public void unlock() {
            sync.release(1);
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    1. 解锁 tryRelease
    //java.util.concurrent.locks.ReentrantLock.Sync
          protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    若当前线程没有持有锁则抛出异常;
    state -= releases,若 state == 0,设置setExclusiveOwnerThread(null),设置解锁状态  free = true;
    更新state;
    返回解锁状态 free;
    
    1. tryRelease成功,UnPark等待队列中的第一个线程
    显示锁的API 特性
    public boolean tryLock() 可轮询,非阻塞。非公平
    public final boolean tryAcquireNanos(int arg, long nanosTimeout) 可配置超时,可定时中断。
    public void lockInterruptibly() throws InterruptedException 可中断

    条件队列Condition

    //java.util.concurrent.locks.ReentrantLock.Sync
      final ConditionObject newCondition() {
                return new ConditionObject();
      }
    

    列出一些ConditionObject中的对象属性域

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
    
    1. 等待 await
            public final void await() throws InterruptedException {
    //首先检查中断
                if (Thread.interrupted())
                    throw new InterruptedException();
    //将当前线程包装成Node加入“条件队列”
                Node node = addConditionWaiter();
    //解锁(包括重入锁),保留state(下次获取锁时会用到)
                int savedState = fullyRelease(node);
                int interruptMode = 0;
    //Node没有转移到“阻塞队列”,则Park。signal唤醒会使得Node转移到阻塞队列(相当于拥有了竞争锁的权力)
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
    //尝试获取锁或者被Park
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    p1.png
    1. 唤醒 signal
         public final void signal() {
    //检查当前线程是否持有锁(上面await方法在fullyRelease时执行了隐式检查)
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
    //唤醒“条件队列”中第一个Node
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
        private void doSignal(Node first) {
                do {
    //设置firstWaiter 为下一个节点,若下一个节点为null则表明“条件队列”中已经没有Node了,设置lastWaiter为null
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
    //first节点将被转移到“阻塞队列”,没有必要保留与下一个Node的关系了
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&//转移Node失败则尝试转移下一个Node
                         (first = firstWaiter) != null);
            }
    
        final boolean transferForSignal(Node node) {
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    //将Node加入“阻塞队列”
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    //unpark这个Node
                LockSupport.unpark(node.thread);
            return true;
        }
    

    二、常用工具类CountDownLatch、CyclicBarrier 、Semaphore、ReadWriteLock

    CountDownLatch

    //java.util.concurrent.CountDownLatch
       public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
    //java.util.concurrent.CountDownLatch.Sync
        Sync(int count) {
                setState(count);
           }
    
    

    初始化CountDownLatch,实质上就是设置了state值。

    1. 等待 await
    //java.util.concurrent.CountDownLatch
     public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
      public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
    //响应中断
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    //java.util.concurrent.CountDownLatch.Sync
      protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    state没有countdown到0,则会执行下面的方法。

    唤醒队列中所有线程(state为0时)或Park当前线程

    //java.util.concurrent.locks.AbstractQueuedSynchronizer  
      private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
    //包装当前线程为Node并加到“阻塞队列”
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
    //若为“阻塞队列”中的头节点,检查state是否为0,若为0
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
    //会唤醒下一个Node
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
    //Park当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
    //将当前Node设置为“阻塞队列”的头节点,执行doReleaseShared(唤醒“阻塞队列”中排头Node)
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
      
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
    //ShareMode模式下的释放操作(唤醒后继节点,保证传播。对于ExclusiveMode,释放通过多次调用unparkSuccessor)
    private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
    //如果当前Node的下一个节点存在,Unpark它。
        private void unparkSuccessor(Node node) {
    
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    1. 递减state countDown
    //java.util.concurrent.CountDownLatch
      public void countDown() {
            sync.releaseShared(1);
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
      public final boolean releaseShared(int arg) {
    //state为0时,执行doReleaseShared(唤醒“阻塞队列”中其他Node)
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    //java.util.concurrent.CountDownLatch.Sync
    //递减state
     protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

    CyclicBarrier

      public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    

    初始化CyclicBarrier

    1. 等待 await
     public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
     private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
    //所有线程到达“栅栏”后,由最后一个到达的线程执行barrierCommand。并且唤醒其他await的线程。
                int index = --count;
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
    //线程await,这里会释放锁
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    

    列出一些CyclicBarrier中的对象属性域

    //The number of parties
    private final int parties;
    //Number of parties still waiting
    private int count;
    private final ReentrantLock lock = new ReentrantLock();
    //Condition to wait on until tripped 
    private final Condition trip = lock.newCondition();
    //The command to run when tripped
    private final Runnable barrierCommand;
    private Generation generation = new Generation();
    
    //用于重置“栅栏”
    private static class Generation {
            boolean broken = false;
    }
    
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    

    执行await时,count等于0时,表示所有线程已到达trip。唤醒所有await的线程,重置“栅栏”。

    Semaphore

        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    
    //java.util.concurrent.Semaphore.Sync
        Sync(int permits) {
                setState(permits);
            }
    

    初始化Semaphore,设置state值。默认使用非公平模式。

    1. 获取资源 acquire
    //java.util.concurrent.Semaphore
     public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
    //支持中断的,共享获取模式
       public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
    //当tryAcquireShared返回负值,那么doAcquireSharedInterruptibly可能会Park当前线程
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    //java.util.concurrent.Semaphore.FairSync
           protected int tryAcquireShared(int acquires) {
    //当remain不小于0时,需要自旋重试
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
    //可用资源available 为0,remaining<0;可用资源available>=请求的资源acquires,则remaining >=0;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
    1. 释放资源 release
    //java.util.concurrent.Semaphore
      public void release() {
            sync.releaseShared(1);
        }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
    //这里会unpark“阻塞队列”中的排头的Node(非Head)
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    //java.util.concurrent.Semaphore.Sync
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    

    ReentrantReadWriteLock

    //java.util.concurrent.locks.ReentrantReadWriteLock
    //默认初始化一个非公平的Sync
       public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    

    ReadLock、WriteLock是ReentrantReadWriteLock的 静态内部类 ,可以发现它们用的 sync 是同一个。

            static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    //ReadLock使用state的高16位
            /** Returns the number of shared holds represented in count  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //WriteLock使用state的低16位
            /** Returns the number of exclusive holds represented in count  */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    

    ReadLock、WriteLock共用state。ReadLock使用state的高16位,WriteLock使用state的低16位。(这种技巧在ExecutorThreadPool也用到了)

    1. 读锁的加锁
    //java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
       public void lock() {
                sync.acquireShared(1);
            }
    //
       public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    

    可以明显的看到,ReadLock使用的是共享式的加锁方式。

    //java.util.concurrent.locks.ReentrantReadWriteLock.Sync
    protected final int tryAcquireShared(int unused) {
       
                Thread current = Thread.currentThread();
                int c = getState();
    //exclusiveCount(c)取state的低16位,不为0,则说明存在写锁。这时若当前线程不是持有写锁的线程,则获取锁失败
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
    //获取state的高16位(这是留给读锁用的)
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
    //尝试CAS修改state(这里没有区分当前读锁是否是重入)
                    compareAndSetState(c, c + SHARED_UNIT)) {
    //下面都是一些统计信息,暂时忽略
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
    //Full version of acquire for reads, that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.
                return fullTryAcquireShared(current);
            }
    
    1. 写锁的加锁
    //java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
      public void lock() {
                sync.acquire(1);
            }
    
    //java.util.concurrent.locks.AbstractQueuedSynchronizer
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    可以明显的看到,WriteLock使用的是独占式的加锁方式。

    //java.util.concurrent.locks.ReentrantReadWriteLock.Sync
    protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
    //c !=0 说明此时有读锁或写锁存在
                if (c != 0) {
    //c !=0,w==0 代表此时有读锁
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
    //此时有写锁且为当前线程持有
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    

    ReentrantReadWriteLock的核心规律就是,写锁和写锁互斥,写锁和读锁互斥,读锁和读锁不互斥。其目的是在读并发比较高的情况下,会有更好的效率。

    三、总结

    常用工具类
    CyclicBarrier 基于ReentrantLock实现,主要应用了Condition
    ReentrantLock 基于AQS独占模式实现
    CountDownLatch 基于AQS共享模式实现
    Semaphore 基于AQS共享模式实现
    ReentrantReadWriteLock 基于AQS独占模式(写锁)和共享模式(读锁)的混合型

    相关文章

      网友评论

          本文标题:AQS源码分析

          本文链接:https://www.haomeiwen.com/subject/cxaofktx.html