美文网首页
JDK源码(一)ReentrantLock和AQS

JDK源码(一)ReentrantLock和AQS

作者: 李亚林1990 | 来源:发表于2019-02-21 09:09 被阅读82次

    一直没有系统整理jdk源码阅读笔记,本篇开始将对jdk类库中优秀的源码实现做一个系列回顾和分析。
    ReentrantLock可重入锁是我们工作中使用非常多的一个类,本篇我们将结合AQS(AbstractQueuedSynchronizer)来分析其内部的实现原理。
    ReentrantLock特性:
    1、获取锁后必须手动释放,我们一般在finally中释放锁;
    2、可以在构造函数ReentrantLock(boolean fair)指定公平策略,默认为非公平锁;
    3、lockInterruptibly()支持可中断;
    4、newCondition()支持条件等待和唤醒;
    5、支持tryLock()尝试获取和tryLock(long timeout, TimeUnit unit)超时获取。

    代码示例(摘自jdk源码):

    class X {
       private final ReentrantLock lock = new ReentrantLock();
       // ...
    
       public void m() {
         lock.lock();  // block until condition holds
         try {
           // ... method body
         } finally {
           lock.unlock()
         }
       }
     }
    

    我们从构造函数开始,一步步分析内部实现。

        private final Sync sync;
    
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    构造函数可以指定公平策略,默认为非公平策略。
    私有成员Sync为同步器,继承自AQS,ReentrantLock锁正是基于AbstractQueuedSynchronizer实现的。
    根据不同的公平策略,提供了内部类FairSync和NonfairSync两种具体实现。如下:


    image.png
    image.png

    接下来我们看看lock()函数的实现逻辑:

        public void lock() {
            sync.lock();
        }
    

    sync.lock();根据根据不同的公平策略具有两种实现,我们放在一起分析和比较。
    NonfairSync.lock()如下:

            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    FairSync.lock()如下:

            final void lock() {
                acquire(1);
            }
    

    可以看到,非公平策略会首先尝试AbstractQueuedSynchronizer.compareAndSetState(int expect, int update)以插队的方式直接获取锁,如果成功则直接返回;否则走到AbstractQueuedSynchronizer.acquire(int arg)逻辑。
    而公平策略在此则不会尝试获取锁,直接走AbstractQueuedSynchronizer.acquire(int arg)逻辑,将严格按照目前的线程等待队列的顺序获取锁资源,先排队的线程先得到所资源。

    我们继续看unlock()函数的实现逻辑:

        public void unlock() {
            sync.release(1);
        }
    

    直接调用AbstractQueuedSynchronizer.release(int arg)释放资源。

    可以看到锁的获取和释放最终调用的是AQS的acquire方法和release方法,接下来我们看看AQS的内部实现。
    为了便于理解,我们先抽取关键代码宏观概览。

    AbstractQueuedSynchronizer.Node内部类用来构成等待线程队列的节点

        static final class Node {
            //阻塞模式常量
            static final Node SHARED = new Node();
            static final Node EXCLUSIVE = null;
            //节点状态常量
            static final int CANCELLED =  1;
            
            static final int SIGNAL    = -1;
            
            static final int CONDITION = -2;
            
            static final int PROPAGATE = -3;
            //节点状态,值为以上常量之一
            volatile int waitStatus;
            //前序节点
            volatile Node prev;
            volatile Node next;
            //等待的线程
            volatile Thread thread;
            //阻塞模式
            Node nextWaiter;
        }
    

    AbstractQueuedSynchronizer的主要成员变量以及未实现的方法:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer {
        //等待队列头节点
        private transient volatile Node head;
        //等待队列尾节点
        private transient volatile Node tail;
        //锁计数:0未占用,大于0已占用
        private volatile int state;
        
        //通过unsafe获取成员变量的偏移量,用于CAS原子更新
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long stateOffset;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long waitStatusOffset;
        private static final long nextOffset;
    
        static {
            try {
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
        //待实现的独占锁的获取和释放策略
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
        //待实现的共享锁的获取和释放策略
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    }
    

    注意以上代码中volatile关键字的使用,保证在多线程的场景下变量的可见性。
    父类AbstractOwnableSynchronizer一个exclusiveOwnerThread实例成员变量,记录当前获取锁的线程。

    对AQS有了一个概览后我们开始分别分析acquire方法和release方法的具体实现。
    acquire方法:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    上述代码片段首先通过tryAcquire尝试获取锁,获取失败的话则调用acquireQueued将当前线程加入等待队列,并最终调用LockSupport.park阻塞当前线程。

    tryAcquire在ReentrantLock中的实现和解读如下:

        //非公平策略
        static final class NonfairSync extends Sync {
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                //AQS锁计数
                int c = getState();
                //state为0,锁未被占用,尝试获取锁
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        //记录当前获取锁的线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //state不为0,锁被占用,如果当前线程就是占用线程,则重入
                //多次获取,state依次增加,释放的时候依次减少,直到为0则当前线程释放锁
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
        //公平策略实现
        static final class FairSync extends Sync {
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    //公平策略需要首先判断是否等待队列是否为空
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }    
        }
    

    可以看到两种实现的区别就在于公平策略需要判断当前等待队列是否为空。

    我们继续看往下看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的实现逻辑:

        /**
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            //为当前线程创建独占模式的Node节点
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            //尝试插入队列尾部
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
           //无限循环尝试将node节点插入队列尾部
            enq(node);
            return node;
        }
    
    

    线程节点插入队列尾部成功后当前线程还在继续运行呢!还需要进行一些后续处理逻辑

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //如果前序节点为head节点,表明当前队列就一个等待线程,则再次重试获取锁  
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //判断是否需要阻塞当前线程,在parkAndCheckInterrupt()中调用LockSupport.park(this);阻塞当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    上面我们分析完了锁的获取,以及获取失败后加入线程等待队列,并阻塞线程。

    接下来我们分析release方法关于锁的释放。

        public final boolean release(int arg) {
            //释放锁计数,如果释放后锁计数为0,则唤醒线程
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //从队列head开始,找到waitStatus符合条件的节点,调用LockSupport.unpark(s.thread)唤醒线程
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    tryRelease在ReentrantLock中的实现和解读如下:

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                //释放后为0,则当前锁的占用线程置为null
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    其他的newCondition()条件等待只是基于ConditionObject多了一个条件等待队列,线程在加入ConditionObject的等待队列后阻塞前需要暂时释放AQS的state计数,解除阻塞后重新占有锁计数,限于篇幅,这里不再赘述。

    至此我们结合AQS理清了ReentrantLock的实现逻辑。

    相关文章

      网友评论

          本文标题:JDK源码(一)ReentrantLock和AQS

          本文链接:https://www.haomeiwen.com/subject/lypcyqtx.html