美文网首页服务端开发实战
ReentrantLock实现及AQS源代码深入解析

ReentrantLock实现及AQS源代码深入解析

作者: 初夏倾城 | 来源:发表于2018-08-26 14:28 被阅读8次

引言

JDK1.5中最引人注目的的便是concurrent包,Doug Lea这名神一样的男人,在JDK1.5中为我们为我们引入了这个包,让并发程序的开发变得更加得心应手,谈到并发,我们不得不谈ReentrantLock,而谈到ReentrantLock,我们就不得不介绍AbstractQueuedSynchronized(AQS)。
类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、CountDownLatch等。
网上关于ReentrantLock的文章有很多很多,介绍其与synchronized的区别的文章也很多,但大多数都很片面,直到今天,拜读了一位博主的文章,跟着他的文章,打开ReentrantLock的源码,一起阅读,终于对其有了一定的理解,也十分佩服Doug Lea的思维,现在,就让我们跟随大师的思路,一起来研究一下ReentrantLock的源码。

AbstractQueuedSynchronizer(AQS)

ReentrantLock实现的前提就是AbstractQueuedSynchronized(AQS),这个类是cocurrent包的核心,CountDownLatch、ReentrantLock、FutureTask等都有一个内部类是这个抽象类的子类,AQS是典型的模板方法设计模式的应用,其中大部分方法都由AQS本身实现好,我们在应该时,只需要重写部分方法就行,如下:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

可以看到这个方法只是抛出了一个异常,需要我们在子类中重写,那么为什么不把方法定义成抽象方法呢,因为Doug Lea考虑到:一般独占锁只需要重写tryAcquire()和tryRelease()方法,共享锁只需要实现tryAcquireShared()和tryRelease()方法,为了减少我们编程人员的工作量,才不将方法设置为抽象方法。

由于AQS是基于FIFO的队列实现,队列肯定由一个个Node节点组成,我们先来看一下Node的属性

属 性 含 义
Node SHARED = new Node() 表示Node处于共享模式
Node EXCLUSIVE = null 表示Node处于共享模式
int CANCELLED = 1 因为超时或者中断,Node被设置为取消状态,被取消的Node不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态,处于这种状态的Node会被踢出队列,被GC回收
int SIGNAL = -1 表示这个Node的继任Node被阻塞了,到时需要通知它
int CONDITION = -2 表示这个Node在条件队列中,因为等待某个条件而被阻塞
int PROPAGATE = -3 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播
int waitStatus 0,新Node会处于这种状态
Node prev 队列中某个Node的前驱Node
Node next 队列中某个Node的后继Node
Thread thread 这个Node持有的线程,表示等待锁的线程
Node nextWaiter 表示下一个等待condition的Node

看完了Node,下面再看一下AQS中有哪些变量和方法:

属 性 含 义
Thread exclusiveOwnerThread 这个是AQS父类AbstractOwnableSynchronizer的属性,表示独占模式同步器的当前拥有者
Node 内部FIFO队列的基本单位
Node head FIFO队列的头结点
Node tail FIFO队列的尾节点
int state 表示同步状态,0表示未锁,一般调用一次加锁方法,state会自动加1
setState(int newState) 设置同步状态
boolean compareAndSetState(int expect, int update) 利用CAS设置同步状态
long spinForTimeoutThreshold = 1000L 设置线程自旋等待时间
Node addWaiter(Node mode) 根据指定Node的模式创建一个包含当前线程的节点并加入FIFO队列中,如果队列为空,则会执行enq()方法新建一个队列,并用一个空的Node作为头结点
Node enq(final Node node) 新建一个FIFO队列,并用一个空的Node作为头结点
void setHead(Node node) 设置队列的头Node
void unparkSuccessor(Node node) 如果存在的话,唤起Node持有的线程
tryAcquire(int arg) 独占模式尝试获取锁(子类去实现)
tryRelease(int arg) 独占模式尝试释放锁(子类去实现)
tryAcquireShared(int arg) 共享模式尝试获取锁(子类去实现)
tryAcquireShared(int arg) 共享模式尝试释放锁(子类去实现)
compareAndSetHead(Node update) 利用CAS设置头Node
compareAndSetTail(Node expect, Node update) 利用CAS设置尾Node
compareAndSetWaitStatus(Node node, int expect, int update) 利用CAS设置 某个Node中的等待状态

整个AQS的类设计的十分巧妙,用到了模板方法设计模式。对于FIFO的队列的操作以及加锁释放锁的整体流程在类中已经设计好了,我们要做的只是重写tryAcquire(int arg)和tryRelease(int arg)方法(注意,如果实现的是共享锁,则需要重写tryAcquireShared(int arg)和tryAcquireShared(int arg)方法)。

ReentrantLock的源码实现

先来看ReentrantLock里最基本的结构,废话少说,上源码:

   public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

首先是ReentrantLock的构造函数,从中可以看出,其构造函数分为无参和有参的构造函数,无参的构造函数默认生成一个NonfairSync的对象,而有参的构造函数根据传入的boolean类型的值,如果是true,则生成一个FairSync的对象,如果是false,则生成NonfairSync的对象。
FairSync和NonfairSync分别代表什么呢?他们对应的其实就是锁中的公平锁和非公平锁的概念,那么什么是公平锁,什么是非公平锁呢,这里简单介绍一下,好让大家有个概念?
举个例子吧,我们去火车站买票,看到拍了很长的队伍,这时候,不同的情况我们会采取不同的做法,第一种情况,我们有很多时间,不着急,我们选择去队伍一个一个排队,这就是公平的,第二种情况,我们时间很紧,我们走到最前面,问柜员我们是否能先买票,因为我们赶时间,这就是不公平,当然,我们会得到两个结果,比较通情理的柜员就会让我们先买,或者让我们冲新区排队。
这个例子或许很抽象,但是,放到并发编程的概念里,就是,公平锁就是严格按照线程的顺序获取锁,而非公平锁,则是可以插队执行,这之后的源代码会介绍到,非公平锁在加锁的方法中,都会先执行一次抢占锁的方法,这就突出了非公平的概念。
而从构造方法来看,默认的ReentrantLock都是非公平的。</br>

继续往下看,

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
}       

首先ReentrantLock内部存在着一个静态的抽象类Sync继承了AbstractQueuedSynchronizer,这个静态类拥有抽象的lock方法,而其子类也就是FairSync和NonfairSync继承了Sync并实现了不同的lock()方法:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
        ...
}
 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        ...
}

这里可以看到源码,NonfairSync的lock方法,每次都会先执行一次CAS操作,去判断state是不是0,是0就设置为1,然后将AbstractOwnableSynchronizer的thread设置为当前线程,这就是之前提到的不公平的概念,因为ReentrantLock的默认实现是不公平的,所以本文之后着重讲述NonfairSync,有兴趣的之后可以去看一下FairSync源码,区别不大,基本就是lock()和tryAcquire()存在区别。</br>

假设现在有两个线程,线程1和线程2,线程1调用了ReentrantLock的lock()方法,这样线程1就会独占这个锁,可以看一下整个调用链,十分简单,

线程1.png

对应的代码就是NonfairSync里面的lock()方法

  final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

首先执行CAS操作,因为是第一个线程,所以AQS里的state必定为0,也就是未锁状态,线程1在执行了这段代码后,将AQS的state设置为1,也就是抢占了改锁,并执行 setExclusiveOwnerThread()方法,看一下该方法源码:

 public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    /**
     * Empty constructor for use by subclasses.
     */
    protected AbstractOwnableSynchronizer() { }

    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

这个方法是AbstractOwnableSynchronizer里面的方法,他将该抽象类里的exclusiveOwnerThread设为当前线程也就是线程1.

这两步操作做完之后线程1就独占了锁。然后此时线程2也想获得这个锁,调用了lock()方法,我们知道,在线程1没释放锁的情况下,这个操作肯定是行不通的,所以线程2必定被阻塞,那么线程2是怎么被阻塞的呢?我们看一下方法的调用链,如下图:

线程2.png

当我第一次看到这张图的时候,心里不免产生恐惧,哇,这么长的调用链,不要慌,随着源码一步一步看下去,你会发现,并没有你想的那么恐怖,首先,还是看lock()方法:

 final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

首先,线程2还是通过CAS去尝试着改变state的值,毕竟不公平嘛,但是免谈,我线程1还没释放这个锁呢,你就想来抢,state仍然为1,抢占锁失败,于是便执行acquire(1),继续看acquire()方法源码:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire()源码是AQS类提供的模板方法,其中tryAcquire是由我们子类去重写的,也就是ReentrantLock重写的,该模板方法也很好理解,先走第一个判断条件尝试获取一次锁,如果获取的结果为false即失败,走第二个判断条件添加FIFO等待队列。所以先看一下tryAcquire()方法做了什么,我们看一下ReentrantLock的tryAcquire()方法,tryAcquire()在FairSync和NonFairSync里面有着不同的实现,因为前面说过了,着重讲非公平的实现,我们就看一下NonFairSync里的tryAcquire()方法:

  protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

可以看到,该方法继续调用了nonfairTryAcquire()方法,继续跟踪:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

首先获取当前线程,然后获取当前state,因为state是被volatile修饰的,所以对线程2具有可见性,线程2拿到了最新的state,再次判断一下state是否为0,也就是锁是否被释放了,为什么这么做,因为可能线程1执行同步代码块较快,已经释放了锁,不可以就返回false,然后false非就是true,就会走第二个判断acquireQueued(addWaiter(Node.EXCLUSIVE), arg)去添加FIFO的等待队列。
这里注意一下else if的判断,这段代码的作用就是实现了可重入锁,就是让某个线程可以多次使用ReentrantLock,如果当前线程等于getExclusiveOwnerThread()取到的线程,也就是之前我们通过setExclusiveOwnerThread()设置的线程,直接就会执行内部代码,不需要再进行CAS操作,每调用一次,就会将state加1,当nextc<0的时候抛出error,什么时候nextc会小于0,就是超过了int的最大值的时候,那么同一个锁最多能重入Integer.MAX_VALUE次,也就是2147483647次。

继续走acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,先看addWaiter(Node.EXCLUSIVE)方法,该方法位于AQS类中:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

首先创建一个当前线程的Node,因为传入的参数是Node.EXCLUSIVE,也就是独占模式,mode也就是null

  Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        

看一下Node对应的构造参数,意思就是这个node的线程是当前线程也就是线程2,下一个等待者是null。
然后判断队列中有没有节点,我们这里的情况肯定是没有节点的,于是执行endq(node)方法,传入参数为之前创建的node,也就是线程2所在的node,看一下endq()方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

endq()方法也是AQS内部的,可以看到内部是一次循环的函数,直到return才会退出循环。传入线程2所在的node后,还是先判断队列是否为空,通过取得tail尾部node,判断是否为null的方式实现,由于我们这个例子中线程2所在的node是第一个要等待的node,所以必定为空,于是通过CAS设置了头node为一个新的Node,这里执行的new Node()是一个无参的构造函数,然后将tail指向head也就是头node,然后继续第二次循环,此时通过Node t = tail 取到的t 就是第一次我们新建的Node对象,执行else的逻辑,首先将我们传入进来的node(线程2所在的node)的prev也就是前驱node设为之前新建的Node,然后执行CAS操作,将AQS里的tail也就是队列尾node设为线程2所在的node,然后将头node的next也就是后继node设为线程2所在的node,然后返回,注意,这里返回的是t,也就是头node(之前通过new Node()新建的node)。

回到addWaiter()方法,此时,我们通过endq()方法新建了一个队列,队列的结构大概如下图:

队列.png

这是个双向队列,然后执行acquireQueued()方法,看一下acquireQueued()方法:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

首先定义了两个boolean类型的值,fail为true,interrupted为为false,继续进入循环,这里先调用node.predecessor()方法,看一下源代码:

 final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

可以看出是返回node的前驱节点,这里的node就是线程2所在的node,而其前驱节点就是头结点,证明该节点是FIFO队列的理论上的第一个节点(头结点没有意义,是空节点),所以满足p == head,因为&&的特性,会继续执行后面的tryAcquire(),会尝试再次获取一次锁(因为在前面这段操作时间内,线程1可能已经执行完毕,state重新变成了0),如果成功,则表示线程2所在的node已经成功获取到了锁,也就代表着其可以从FIFO队列出队,所以讲前面取到的head节点的后继节点设为null,然后将failed置为false(这里不太理解,failed用于finally函数中,也就是try执行完毕或者发生异常后会通过判断failed
的值去决定是否执行cancelAcquire()方法),返回interrupted也就是false,回到acquire()方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

返回false,不会执行selfInterrupt()方法。

如果尝试获取所失败,则走第二个if判断也就是 if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()),先看shouldParkAfterFailedAcquire(p,node)方法,分别传入了头节点和线程2所在的节点作为参数,看一下shouldParkAfterFailedAcquire()实现:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

该方法位于AQS类中,先获取pred的waitStatus,也就是头节点的waitStatus,我们这里头节点是一个默认生成的节点,其waitStatus就是int的默认值也就是0,然后进行判断,会走最后一个代码块,通过CAS操作将waitStatus更新SIGNAL也就是-1,然后返回false,因为返回false,if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()),根据短路&&的逻辑parkAndCheckInterrupt()自然不会执行,然后会再走一次for循环,此时依然会像前面一样再次尝试获取锁,获取失败的话再次进入shouldParkAfterFailedAcquire(p, node)方法,这时候ws取到的是前面更新过的值,也就是SIGNAL,然后返回true,于是就会执行parkAndCheckInterrupt()方法,看一下parkAndCheckInterrupt()源码:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

可以看到调用了LockSupport的park方法阻塞住了当前的线程,看一下park()的源码,它是LockSupport类中的一个静态方法:

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

获取到了当前线程,通过UNSAFE的一系列操作阻塞了当前线程,这里不详细讲述UNSAFE的原理,因为我也不清楚,有兴趣的同学可以下载OpenJDK的源码详细看一下UNSAFE的实现机制。

上面的流程总结下来,大概就是通过调用ReentrantLock实现线程1独占锁,线程2进入AQS中的FIFO队列并阻塞的整个过程,可以看出来,整个过程十分精妙,AQS的设计就是一个典型的模板方法设计模式,他的子类也就是ReentrantLock只是实现了tryAcquire()方法,其余的方法都由AQS实现好了,并且其中多个方法都有着尝试再次获取锁的操作,这换做是我,一定只会在刚开始判断一下,大师不愧是大师,多读源码,才能更好地了解大师的想法。

上面讲述了lock()的实现过程,下面来看一下unlock()也就是释放锁的实现过程。

首先调用ReentrantLock的unlock()方法,

 public void unlock() {
        sync.release(1);
    }

然后看一下release()方法,

 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

很明显,tryRelase()是AQS提供的抽象方法,具体的实现肯定由子类来实现,我们看一下ReentrantLock里的tryRelease()方法,

 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

首先可以看到如果当前线程和getExclusiveOwnerThread()返回的线程不相等,会直接抛出IllegalMonitorStateException()异常,那当然了,你都没有获得锁,你又怎么去释放锁。

然后每次调用unlock()方法,就会取当前AQS内部的state并减1,只有当c==0的时候,才会让free为true并返回,这对应着就是我们上面的可重入方法,一个线程可以多次调用ReentrantLock的lock()方法,对应着,你调用了多少次lock()方法就需要调用同样次数的unlock()方法,当c == 0 的时候,也就意味着该线程对当前的ReentrantLock全部解锁,于是通过setExclusiveOwnerThread(null)方法把AbstractOwnableSynchronizer的exclusiveOwnerThread将被设置为null,这样就表示没有线程占有锁,然后返回true,执行大括号内部的代码。

此时取到AQS内部的FIFO的head节点,依然是之前new Node()产生的节点,h!=null成立,而其waitStatus之前被更新成了SIGNAL状态也就是-1,所以h != null && h.waitStatus != 0成立,执行unparkSuccessor(h)方法,来看一下unparkSuccessor()方法:

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

首先取到传入的node也就是head节点的status也就是-1,-1<0,执行CAS操作将head的status转为-1,然后取到node的next也就是后继节点,也就是线程2所在的节点,s!=null成立,执行 LockSupport.unpark(s.thread),线程2就被unpark了。

有人会疑惑了,线程2得以运行后,线程2所在的节点又是怎么退出AQS内部的FIFO队列的呢?我们来回溯一下之前的源码:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

线程2是在parkAndCheckInterrupt()处被阻塞的,当锁释放,线程2得以继续运行时,并没有return语句,只是将interrupted置为true,于是会再次进入循环,这时候,p == head 和 tryAcquire(arg)都会返回true,进入内部,首先执行setHeader(node)方法,这里要特别注意了:

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

线程2所在的node被设置为了头结点,且节点内部的thread变量被设置成了null,且prev也被设置成了null,然后把p的next设置为null,这样原头Node里面的所有对象都不指向任何块内存空间,方法结束后被自动回收,而原先线程2所在的node成了新的头结点(此时内部thread变量已经为null,也就是此时node不属于任何一个线程),此时,遇到一个return语句,acquireQueued方法结束,返回interrupted,这里我产生了一个一位,此时的interrupted是true还是false,这里要看一下

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

这段代码,因为之前说到,线程2是调用的 parkAndCheckInterrupt()进行阻塞的,再看一下parkAndCheckInterrupt()源码:

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

这里return的是Thread.interrupted()方法得到的返回值,然后我们看一下Thread类的interrupted()网上的介绍:

Thread.interrupted()测试当前线程是否已经中断,线程的中断状态也是由该方法清除。默认返回false,如果当前线程之前通过调用interrupt()导致处于中断状态,则会返回false并清除中断状态,下次再次调用依然会返回false,这样就比较好理解了,如果当前线程是出于中断状态,parkAndCheckInterrupt()返回true, interrupted也就是中断标志被设置为true,acquireQueued()返回值就是true,返回true的话

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

可以看出来就会执行selfInterrupt()中断当前线程

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

这大概就是我理解的ReentrantLock的大致过程,有的地方依旧比较模糊,如果之后有想通的会再来补充。

相关文章

网友评论

    本文标题:ReentrantLock实现及AQS源代码深入解析

    本文链接:https://www.haomeiwen.com/subject/rddoiftx.html