美文网首页
java ReentrantLock 与 AQS 加解锁过程

java ReentrantLock 与 AQS 加解锁过程

作者: 测试一下能打几个字 | 来源:发表于2021-03-17 22:32 被阅读0次

    注:关键的代码的部分会有注释。

    AQS介绍,主要是看属性,方法遇到了再看

    内部类 node,用于维护获取锁的线程的双向队列,详细的可以具体看注释,这里就不一一介绍了
        static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     The successor of this node is (or will soon be)
             *               blocked (via park), so the current node must
             *               unpark its successor when it releases or
             *               cancels. To avoid races, acquire methods must
             *               first indicate they need a signal,
             *               then retry the atomic acquire, and then,
             *               on failure, block.
             *   CANCELLED:  This node is cancelled due to timeout or interrupt.
             *               Nodes never leave this state. In particular,
             *               a thread with cancelled node never again blocks.
             *   CONDITION:  This node is currently on a condition queue.
             *               It will not be used as a sync queue node
             *               until transferred, at which time the status
             *               will be set to 0. (Use of this value here has
             *               nothing to do with the other uses of the
             *               field, but simplifies mechanics.)
             *   PROPAGATE:  A releaseShared should be propagated to other
             *               nodes. This is set (for head node only) in
             *               doReleaseShared to ensure propagation
             *               continues, even if other operations have
             *               since intervened.
             *   0:          None of the above
             *
             * The values are arranged numerically to simplify use.
             * Non-negative values mean that a node doesn't need to
             * signal. So, most code doesn't need to check for particular
             * values, just for sign.
             *
             * The field is initialized to 0 for normal sync nodes, and
             * CONDITION for condition nodes.  It is modified using CAS
             * (or when possible, unconditional volatile writes).
             */
            volatile int waitStatus;
    
            /**
             * Link to predecessor node that current node/thread relies on
             * for checking waitStatus. Assigned during enqueuing, and nulled
             * out (for sake of GC) only upon dequeuing.  Also, upon
             * cancellation of a predecessor, we short-circuit while
             * finding a non-cancelled one, which will always exist
             * because the head node is never cancelled: A node becomes
             * head only as a result of successful acquire. A
             * cancelled thread never succeeds in acquiring, and a thread only
             * cancels itself, not any other node.
             */
            volatile Node prev;
    
            /**
             * Link to the successor node that the current node/thread
             * unparks upon release. Assigned during enqueuing, adjusted
             * when bypassing cancelled predecessors, and nulled out (for
             * sake of GC) when dequeued.  The enq operation does not
             * assign next field of a predecessor until after attachment,
             * so seeing a null next field does not necessarily mean that
             * node is at end of queue. However, if a next field appears
             * to be null, we can scan prev's from the tail to
             * double-check.  The next field of cancelled nodes is set to
             * point to the node itself instead of null, to make life
             * easier for isOnSyncQueue.
             */
            volatile Node next;
    
            /**
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;
    
            /**
             * Link to next node waiting on condition, or the special
             * value SHARED.  Because condition queues are accessed only
             * when holding in exclusive mode, we just need a simple
             * linked queue to hold nodes while they are waiting on
             * conditions. They are then transferred to the queue to
             * re-acquire. And because conditions can only be exclusive,
             * we save a field by using special value to indicate shared
             * mode.
             */
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
         .....
        //获取锁队列队头
        private transient volatile Node head;
        //获取锁队列队尾
        private transient volatile Node tail;
        //加锁标志位,主要是通过cas操作,算是委托给他判断是否加锁成功
        private volatile int state;
        //在aqs父类中,当前持有锁的线程
        private transient Thread exclusiveOwnerThread;
        ......
    

    ReentrantLock的具体属性

     //Sync 继承自AQS
     abstract static class Sync extends AbstractQueuedSynchronizer{...}
     //
    private final Sync sync;
    //公平实现
    static final class FairSync extends Sync {...}
    //非公平实现
    static final class NonfairSync extends Sync {...}
    

    加锁过程

    lock方法解析
    public void lock() {
            sync.lock();
        }
    
    sync.lock() 方法有两个实现,一个是公平锁的实现,另一个是非公平锁的实现,如下图 1.png

    我们先看非公平锁的实现的加锁方式(NonfairSync),代码如下

     final void lock() {
                //如果cas修改值state值成功,则加锁成功
                if (compareAndSetState(0, 1))
                    //设置当前持有锁的线程为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    //不成功就在重新获取锁,过程较为复杂
                    acquire(1);
            }
    

    acquire(1)

      public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    此方法可分为三个部分,!tryAcquire(arg)、addWaiter(Node.EXCLUSIVE)、 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),一一分析具体干了什么事。

    !tryAcquire(arg),可以跟踪代码先调用NonfairSync.tryAcquire()方法,调用Sync.nonfairTryAcquire()
     protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
    final boolean nonfairTryAcquire(int acquires) {  
                //获取当前线程
                final Thread current = Thread.currentThread();
                 //获取state的值
                int c = getState();
                //等于0那么现在无线程持有锁
                if (c == 0) {
                    //cas获取锁
                    if (compareAndSetState(0, acquires)) {
                        //设置当前线程持有锁
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果持有锁的线程和当前线程为同一线程(重入),state累加
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    

    若以上方法返回true,那么加锁成功,则!tryAcquire(arg)为false,不会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这部分代码。若以上代码返回false,加锁失败,则!tryAcquire(arg)为true,则会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),现在先假设加锁失败。

    addWaiter(Node.EXCLUSIVE),索取锁线程入队
      private Node addWaiter(Node mode) {
            //新建node
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            //取队尾
            Node pred = tail;
            //如果队尾不为空
            if (pred != null) {
                //取当线程的的节点的前节点为对位
                node.prev = pred;
                //cas设置队尾的后节点为当前线程的节点
                if (compareAndSetTail(pred, node)) {
                    //倒数第二节点的下个节点指向当前线程的节点
                    pred.next = node;
                    return node;
                }
            }
            //尾节点不为空,入队
            enq(node);
            return node;
        }
        //入队相关代码
        private Node enq(final Node node) {
            for (;;) {
                //取队尾
                Node t = tail;
                //初始化队列
                if (t == null) { // Must initialize
                    //设置头节点
                    if (compareAndSetHead(new Node()))
                        //队首队尾都指向 上一行的new Node()
                        tail = head;
                } else {
                    //当前线程节点的前节点为队尾节点
                    node.prev = t;
                    //cas操作设置队尾为当前线程节点
                    if (compareAndSetTail(t, node)) {
                        队尾(现在倒数第二个节点)后节点为当前线程节点
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    

    到这里入队的相关操作就完成分析了,开始下一个方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的分析。

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg),从队列面获取线程来持有锁。
      final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //获取当前线程节点的前节点
                    final Node p = node.predecessor();
                    //如果前节点为队首(这里明白这个双向队列的第一个节点为空),且获取锁成功(NonfairSync.tryAcquire()) 
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        //如果不设置为null,可能会引起oom,引用一直存在
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //判断是否需要挂机线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        //设置当前节点为头节点,清空了线程和前节点
         private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
      
        //判断是否需要挂起线程
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            //node 类中有对应描述,代表现需要被unpraking
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            //节点被取消,跳过 、重试
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                //设置parking标志
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        //park线程
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    

    以上就是ReentrantLock 的加锁过程。后面解析ReentrantLock 的解锁过程

    unlock

      public void unlock() {
            //调用sync的release
            sync.release(1);
        }
      
       public final boolean release(int arg) {
            //如果解锁成功
            if (tryRelease(arg)) {
                //取头节点
                Node h = head;
                //如果头节点不为空且waitStatus 不为CANCELLED、SIGNAL    、CONDITION 、PROPAGATE 
                if (h != null && h.waitStatus != 0)
                    //解锁线程
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
       protected final boolean tryRelease(int releases) {
                //获取状态并减1
                int c = getState() - releases;
                 //判断持有锁的线程是不是当前线程
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                //解锁成功
                if (c == 0) {
                    free = true;
                    //清空线程
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
          //unpark线程节点
           private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            //ws<0 即ws为CANCELLED 、SIGNAL    、CONDITION 、PROPAGATE 中一种
            if (ws < 0)
                //ws设置为0,让下一个线程节点尝试获取锁
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    公平锁与非公平锁区别在于tryAcquire的时候,会检查是不是已经有线程在排队了,如果有那么直接入队尾,不在尝试获取锁

     protected final boolean tryAcquire(int acquires) {
               final Thread current = Thread.currentThread();
               int c = getState();
               if (c == 0) {
                   if (!hasQueuedPredecessors() &&
                       compareAndSetState(0, acquires)) {
                       setExclusiveOwnerThread(current);
                       return true;
                   }
               }
               else if (current == getExclusiveOwnerThread()) {
                   int nextc = c + acquires;
                   if (nextc < 0)
                       throw new Error("Maximum lock count exceeded");
                   setState(nextc);
                   return true;
               }
               return false;
           }
           //判断是队列是否已有线程
           public final boolean hasQueuedPredecessors() {
           // The correctness of this depends on head being initialized
           // before tail and on head.next being accurate if the current
           // thread is first in queue.
           Node t = tail; // Read fields in reverse initialization order
           Node h = head;
           Node s;
           //h != t  初始化的时候,(s = h.next) == null头节点的下个节点为空(第一个节点为”空“),s.thread != Thread.currentThread()头节点的下个节点与当前线程不为同一线程
           return h != t &&
               ((s = h.next) == null || s.thread != Thread.currentThread());
       }
    

    综上就是ReentrantLock的加锁和解锁过程,现在对自旋有一点感觉了,enq(...)和 acquireQueued(...)中循环都是自选的体现。

    相关文章

      网友评论

          本文标题:java ReentrantLock 与 AQS 加解锁过程

          本文链接:https://www.haomeiwen.com/subject/eenecltx.html