美文网首页java所有基础知识
ReentrantLock加锁(lock())、释放锁(unlo

ReentrantLock加锁(lock())、释放锁(unlo

作者: 丑星星 | 来源:发表于2017-05-03 15:59 被阅读607次

    一、简介

    ReentrantLock是JUC包下比较重要同样也是比较常用的一个类,从类名可以看出它的功能:Lock是锁,reentrant可重入,可重入锁。在功能上ReentrantLock和synchronized是相似的,它可以通过这种方式实现加锁和释放锁:

    class X {
        private final ReentrantLock lock = new ReentrantLock();
        // ..
        public void m() {
          lock.lock();  // block until condition holds
          try {
            // ... method body
          } finally {
            lock.unlock()
          }
        }
      }}
    

    要注意的是在加锁和释放锁的时候一定要通过try-finally的方式来操作,防止在加锁之后程序异常退出没有调用 lock.unlock() 方法释放锁。
     ReentrantLock还可用通过Condition条件来实现等待和通知,不过这不是本文讨论的重点,这里就不再赘述。

    二、实现原理

    我们先来看一下ReentrantLock的lock()和unlock()方法的实现:

    public void lock() {
        sync.lock();
    }
    
    public void unlock() {
        sync.release(1);
    }
    

    可以看到,加锁和释放锁是通过sync对象的lock()方法和release()方法实现的。sync对象是什么可以从它的定义和构造方法看出来:

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    从定义的注释我们可以看得出ReentrantLock的功能都是通过Sync实现的,而且Sync分为公平锁和非公平锁。

    这里我们先贴出FairSync和NonfairSync的类图:


    类图

    1.AbstractOwnableSynchronizer

    我们先来看一下AbstractOwnableSynchronizer这个抽象类:

    /**
     * A synchronizer that may be exclusively owned by a thread.  This
     * class provides a basis for creating locks and related synchronizers
     * that may entail a notion of ownership.  The
     * {@code AbstractOwnableSynchronizer} class itself does not manage or
     * use this information. However, subclasses and tools may use
     * appropriately maintained values to help control and monitor access
     * and provide diagnostics.
     *一个允许一个线程排他拥有的同步器,这个类提供了创建需要所有权概念的锁和相关同步器的基础。
     *AbstractOwnableSynchronizer类自己并不管理或使用任何信息,然而,子类和使用工具类可能会适当
     *的使用它管理的值来控制监控获取和提供诊断。
     * @since 1.6
     * @author Doug Lea
     */
    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        /** Use serial ID even though all fields transient. */
        private static final long serialVersionUID = 3737899427754241961L;
    
        /**
         * Empty constructor for use by subclasses.
         */
        protected AbstractOwnableSynchronizer() { }
    
        /**
         * The current owner of exclusive mode synchronization.
         */
        private transient Thread exclusiveOwnerThread;
    
        /**
         * Sets the thread that currently owns exclusive access.
         * A {@code null} argument indicates that no thread owns access.
         * This method does not otherwise impose any synchronization or
         * {@code volatile} field accesses.
         * @param thread the owner thread
         */
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        /**
         * Returns the thread last set by {@code setExclusiveOwnerThread},
         * or {@code null} if never set.  This method does not otherwise
         * impose any synchronization or {@code volatile} field accesses.
         * @return the owner thread
         */
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    

    setExclusiveOwnerThread(Thread thread) 这个方法设置当前拥有独占锁的线程。
    getExclusiveOwnerThread()这个方法返回当前拥有独占锁的线程。
    AbstractOwnableSynchronizer这个类通过setter/getter方法实现设置、获取当前拥有独占锁的线程,这个是一个非常简单的类,我们就不再多说了。

    2.AbstractQueuedSynchronizer简单介绍

    AbstractQueuedSynchronizer(以下简写为AQS)这个抽象类是ReentrantLock或者说是整个并发编程包下面锁实现的关键所在,它定义了实现锁的步骤(具体会在下面介绍),提供了几个方法由子类实现,我们可以看到,虽然这个类中没有抽象方法,但是有些方法是直接抛出UnsupportedOperationException异常的。这些方法就是要由子类实现的:

    protected boolean tryAcquire(int arg)
    protected boolean tryRelease(int arg)
    protected int tryAcquireShared(int arg)
    protected boolean tryReleaseShared(int arg)
    protected boolean isHeldExclusively()
    

    ReentrantLock是独占锁,它的内部类ReentrantLock.Sync和他的子类ReentrantLock.NonfairSync、ReentrantLock.FairSync并没有实现tryAcquireShared(int arg)、tryReleaseShared(int arg)方法。
    <p>到这里我们先做一个小小的总结:ReentrantLock实现加锁和释放锁是通过内部类ReentrantLock.NonfairSync、ReentrantLock.FairSync的方法实现的,这两个类都继承了AQS。接下来我们将分析ReentrantLock.NonfairSync、ReentrantLock.FairSync的具体实现。

    3.FairSync、NonfairSync的实现

    我们先来看一下具体实现代码:
    非公平锁:

    /**
         * Sync object for non-fair locks
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    公平锁:

        /**
         * Sync object for fair locks
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    我们可以先比较一下公平锁和非公平锁的异同点:我们先看一下lock()方法,这里再公平锁和非公平锁的lock()方法贴出来:

    /**
     *非公平锁的实现
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    /**
     *公平锁的实现
     */
    final void lock() {
        acquire(1);
    }
    

    acquire(int)方法是AQS定义的方法,我们可以先来看看这个方法的实现:

        /**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire(int)我们知道,是由子类实现的(这里体现了面试继承的思想,用了模板模式),它的功能是尝试获取锁,我们接下来会分析具体实现。addWaiter(Node.EXCLUSIVE)是生成一个新的节点添加到等待队列里,参数表示这个节点获取锁是以独占方式获取。acquireQueued()这个方法是实现lock阻塞(其实是等待,上一篇文章讲过,ReentrantLock是通过LockSupport.park()实现线程等待的,这个时候线程的状态是WAITING)。acquireQueued()这个方法的具体实现接下来会讲到。selfInterrupt()方法其实就是终端当前线程:Thread.currentThread().interrupt();至于为什么会要终端当前线程,我们接下来也会讲。
    我们现在先来看一看tryAcquire(int)的实现,公平锁和非公平锁对这个方法的实现是不同的。先来看一看公平锁对tryAcquire(int)的实现:

    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    getState()可以获取锁当前被占有的状态(其实是重入次数,0的时候是没有被任何线程占有,大于0的时候是重入次数)。如果getState()不是0,说明当前锁被线程占用,如果占用锁的线程是当前线程,就表示是锁重入,这个时候将状态增加acquires并返回ture表示尝试获取锁成功。如果getState()是0的话,表示现在没有线程占有锁,这个时候通过hasQueuedPredecessors()去判断等待队列(等待队列是什么,是怎么产生的我们后面会讲到,现在只需要知道等待队列是一个FIFO的队列,每个队列的节点会保存等待获取锁的线程,在队列前面的节点表示先尝试获取的线程节点)里是否有先于当前线程的线程在等待,这个操作是公平锁和非公平锁区别所在,接下来讲到非公平锁的时候会详细讲解。如果没有先于当前线程的线程在等待队列里等待,就通过compareAndSetState(0, acquires))方法来讲state设置为acquires,这个步骤是通过CAS(CAS会在以后的文章中讨论)实现的,因为state是一个临界资源,有可能两个线程同时获取锁的时候对这个值进行修改,所以通过CAS操作来保证线程安全的设置state。如果设置成功,则表示占有锁成功,,然后通过AbstractOwnableSynchronizer的setter方法将当前线程设置为当前占有锁的线程。然后返回true表示尝试获取锁成功。这就是公平锁尝试获取锁的过程。
    接下来我们看一下非公平锁尝试获取锁的过程:

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    我们可以看到非公平锁和公平锁在尝试获取锁的唯一区别是:当前没有线程占有锁的时候,公平锁会先去等待队列判断有没有先于当前线程的线程等待获取锁,而非公平锁不会去判断有没有其他等待线程,而是直接去尝试占有锁。
    当尝试占有锁成功后就会执行用户自己的代码,如果尝试占有锁失败,这里回到了AQS的acquire(int)方法,当tryAcquire(arg)返回false的时候,会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。addWaiter(Node.EXCLUSIVE)是什么东西呢?上文提到了等待队列,这个方法就是等待队列产生的地方。首先要知道AQS中定义了两个属性:

        /**
         * Head of the wait queue, lazily initialized.  Except for
         * initialization, it is modified only via method setHead.  Note:
         * If head exists, its waitStatus is guaranteed not to be
         * CANCELLED.
         */
        private transient volatile Node head;
    
        /**
         * Tail of the wait queue, lazily initialized.  Modified only via
         * method enq to add new wait node.
         */
        private transient volatile Node tail;
    

    AQS中定义了队列的头节点和尾节点(这里又涉及到了一个知识点volatile,不清楚的同学可以自行查阅资料,以后的文章也可能会讨论),什么是Node节点呢?我们先把代码贴出来:

        static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     The successor of this node is (or will soon be)
             *               blocked (via park), so the current node must
             *               unpark its successor when it releases or
             *               cancels. To avoid races, acquire methods must
             *               first indicate they need a signal,
             *               then retry the atomic acquire, and then,
             *               on failure, block.
             *   CANCELLED:  This node is cancelled due to timeout or interrupt.
             *               Nodes never leave this state. In particular,
             *               a thread with cancelled node never again blocks.
             *   CONDITION:  This node is currently on a condition queue.
             *               It will not be used as a sync queue node
             *               until transferred, at which time the status
             *               will be set to 0. (Use of this value here has
             *               nothing to do with the other uses of the
             *               field, but simplifies mechanics.)
             *   PROPAGATE:  A releaseShared should be propagated to other
             *               nodes. This is set (for head node only) in
             *               doReleaseShared to ensure propagation
             *               continues, even if other operations have
             *               since intervened.
             *   0:          None of the above
             *
             * The values are arranged numerically to simplify use.
             * Non-negative values mean that a node doesn't need to
             * signal. So, most code doesn't need to check for particular
             * values, just for sign.
             *
             * The field is initialized to 0 for normal sync nodes, and
             * CONDITION for condition nodes.  It is modified using CAS
             * (or when possible, unconditional volatile writes).
             */
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
            Node nextWaiter;
    
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    我们可以看到,Node类定义了节点的先去节点和后继节点(prev、next),至于nextWaiter是ReentrantLock的Condition要使用的属性,不在本文讨论范围,所以大家可以先忽略这个属性。Node还定义了thread,表示一个等待锁的线程。waitStatus表示当前节点所处的状态,我么可以看到有1、0、-1、-2、-3五种状态。
    1(CANCELLED):表示当前节点被取消,通过lock()加锁的节点是不会处于这种状态的,只有通过lockInterruptibly()方法加锁才会有这种状态,因为通过lock()加锁的时候,线程是不会响应中断的,这点我们后面会详细介绍。
    0:表示节点刚被创建,是初始状态。
    -1(SIGNAL):表示一个节点的后继正在被阻塞,所以这个节点释放锁或者被中断后,必须unpark它的后继节点。为了避免锁的竞争,acquire()方法必须先保证节点处于SIGNAL状态,然后再去自动的获取锁,如果失败就阻塞。
    -2(CONDITION):这个是Condition所使用的状态,我们这里可以不用关心,AQS所维护的队列不会处于这个状态,只有在AQS的内部类ConditionObject所维护的一个队列的节点才会处于这个状态。
    -3(PROPAGATE):这个状态我们也可以先不用关心。共享模式下才会用到这个状态。
    我们讨论了这么多关于Node类的内容,现在我们回到addWaiter(Node.EXCLUSIVE)这个方法上来,我们说过,AQS的等待队列是由这个方法产生的,接下来我们看一下这个方法的实现:

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    首先这个方法会先创建一个持有当前线程的Node对象,然后判断尾节点是不是null,如果不是null,则把新创建的节点的前驱节点设置为尾节点,然后通过CAS操作将尾节点的指向设置为新创建的节点,如果成功,把原来的尾节点的后继节点设置为新创建的节点。这个说法比较复杂,用一句话概括就是把新节点添加到队列的尾部,也就是入队。如果尾节点是null,或者compareAndSetTail(pred, node)这个方法失败后,会调用enq(node)方法,我们看一下这个方法的实现:

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    其实这个方法也是一个无锁编程的典型例子,我们可以看到,当尾节点是空,这个时候表示等待队列还没有节点进入,也就是之前没有发生锁竞争的时候,或者说这是第一次发生锁竞争,这个时候会新建一个空节点来作为队首和队尾。然后会将传进来的节点添加到队尾。这个时候我们对addWaiterr(Node mode)方法做个总结:这个方法是创建一个持有当前线程的Node对象,然后通过线程安全的方法将这个对象入队并返回这个对象。
    当addWaiter(Node.EXCLUSIVE), arg)方法返回节点之后,会调用acquireQueued()方法,这个方法也是实现ReentrantLock阻塞的关键。我们看一下具体的实现:

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    我们传进来的node参数是刚加入队列的node,首先进入一个循环,为什么会循环操作呢,接下来会讨论。我们先看看循环内的具体实现:先获得当前节点的前驱节点p,如果p是头结点,说明前面并没有先于当前节点等待的线程,这个时候就去尝试获取锁,如果获取成功就会把头结点设置为当前节点(从这里我们可以看出头结点的两个语义:头结点可能是一个新创建的空Node对象,也可能是一个正在持有锁的线程节点)。前驱节点是头结点,并且 tryAcquire(arg)失败,说明前驱结点的线程正在持有锁还没有释放,或者说是前驱结点的线程持有的锁被释放了,这个时候有其他线程抢占了锁。这个时候我们就去执行shouldParkAfterFailedAcquire(p, node) 方法,这个方法的功能是判断当前节点在获取锁失败后是否需要阻塞。我们看一下这个方法的具体实现:

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    如果当前节点的前驱节点是SIGNAL状态,说明前驱节点在等待锁,所以这个时候当前节点需要阻塞。如果前驱节点不是SIGNAL状态而是CANCELLED状态,就会把前驱节点以及前驱节点之前的直接关联的状态时CANCELLED状态的节点跳过然后返回false,不让当前线程阻塞,而是让当前线程去执行外部的方法,也就是去继续循环尝试获取锁。如果先去节点的状态不是SIGNAL、CANCELLED、状态,在ReentrantLock里它的状态就应该是0状态,也就是初始状态,这二个时候说明前面没有先于当前节点的节点等待获取锁,所以就把当前节点的状态设置为SIGNAL状态,表示当前节点是正在等待竞争锁的,而不是SIGNAL的节点是会被阻塞的,根本没有竞争锁的机会。
    当通过调用shouldParkAfterFailedAcquire方法判断当前节点是否需要阻塞后,如果不需要阻塞,当前节点会循环调用这段代码去竞争锁:

        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
             setHead(node);
             p.next = null; // help GC
             failed = false;
             return interrupted;
     }
    

    如果调用shouldParkAfterFailedAcquire判断当前节点需要被阻塞,就调用parkAndCheckInterrupt()方法,我们先看一下这个方法的实现:

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    这个方法会调用LockSupport.park(this);来让当前线程进入等待状态。LockSupport.park(this);这个方法会响应线程中断而结束,所以在这个方法结束后要判断当前线程的中断状态,并返回当前线程的中断状态。我们之前说过,ReentrantLock的lock()方法是不会响应线程中断的,原因就是在这:当线程被中断后parkAndCheckInterrupt()返回true,这个时候只会把interrupted中断标志设置为true,然后还会循环去执行上面提到的代码判断当前线程是不是有资格去竞争锁。这个操作是和通过调用lockInterruptibly()实现中断是不同的,lockInterruptibly()会响应线程中断,他是通过调用AQS的doAcquireInterruptibly(int arg)方法实现的,有兴趣的同学可以看一下这个方法的实现:

        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    这个方法和acquireQueued基本是一样的,只是在 parkAndCheckInterrupt()判断线程被中断后抛出了一个异常来推出循环操作。
    继续回到acquireQueued这个方法,什么时候parkAndCheckInterrupt()方法调用LockSupport.park(this);会正常推出呢?一个获得锁的线程执行加锁的代码块后,会调用unlock()方法来实现锁的释放,这个时候unlock()内部的方法会调用LockSupport.unpark(Thread)方法,参数内传入的线程对象是头节点的后继节点持有的线程,用来使阻塞在 parkAndCheckInterrupt()方法上的头节点的后继节点释放。(因为头节点代表着正在持有锁的线程,头节点的后继节点表示等待锁释放的线程)。当acquireQueued方法结束后,我们再回到AQS的acquire方法,我们可以知道如果在acquireQueued方法阻塞过程中,如果中间中断了线程,虽然不会实现线程中断,但是acquireQueued方法会返回ture,这个时候selfInterrupt()方法会被执行,我们上文提到过,selfInterrupt()方法会调Thread.currentThread().interrupt();。为什么要执行这个操作呢?因为方法acquireQueued在阻塞过程中如果线程中断,这个时候依然会判断当前这个线程所在等待队列中的前驱结点是否是头节点,如果不是还是会调用LockSupport.park()再次阻塞线程,在parkAndCheckInterrupt()返回是否是因为中断结束的时候,使用的是Thread.interrupted()方法,这个方法在调用结束后会清除中断状态。所以如果不调用selfInterrupt()这个方法,当前线程的中断状态就是false。这里调用selfInterrupt()来设置中断状态为true。
    到此为止,我们已经讨论完了ReentrantLock通过lock()加锁的整个过程。看似比较复杂,但是如果能慢慢理一下,就会发现其实原理是蛮简单的。

    4、unlock()的实现

    我们先来看看具体实现:

        public void unlock() {
            sync.release(1);
        }
    

    unlock()方法是通过AQS的release(int)方法实现的,我们可以看一下:

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    这个方法和acquire的思路一样,也用了模板模式,tryRelease()是由子类实现的,我们来看一下ReentrantLock中的Sync对它的实现:

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    这个操作和tryAcquire的操作是一个相反的过程。先通过getState获得状态标识,如果这个标识和要释放的数量相等,就会把当前占有锁的线程设置为null,实现锁的释放,然后返回true,否则把状态标识减去releases再返回false。
    释放锁之后,判断头结点是不是空,如果是空表示没有等待的锁的线程。如果不是空,再判断头结点的状态是不是0,头结点是0表示什么呢?我们从前面的讨论中可以知道,头结点一般表示当前正在持有锁的线程的节点,但是当头结点是0时,头结点表示获取锁第一次发生竞争后,初始化等待队列被设置的节点,这个节点的后继节点没有被阻塞。所以这个时候我们也不需要做后续操作。如果头结点不是0,就调用unparkSuccessor(Node)方法。我们来看一下这个方法:

        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    首先会先获取该节点的状态(这个时候是头结点)。如果头结点是负的,就说明后继节点有可能等待被释放。这个时候调用 compareAndSetWaitStatus(node, ws, 0)把节点的状态设置为0,不过这个操作的结果不会影响后续的操作。然后获取头结点的后继节点,判断后继节点的状态不是CANCELLED状态,如果是,这找到第一个不是CANNCELLED状态的结点,然后使这个线程取消阻塞状态。至此为止unlock()结束。

    相关文章

      网友评论

        本文标题:ReentrantLock加锁(lock())、释放锁(unlo

        本文链接:https://www.haomeiwen.com/subject/sphctxtx.html