美文网首页Java互联网科技
并发锁核心类AQS学习笔记(超详细)

并发锁核心类AQS学习笔记(超详细)

作者: Java码农石头 | 来源:发表于2020-05-10 20:47 被阅读0次


    一、概念

    AQS 是 AbstractQueuedSynchronizer 的简称,AQS 是一个抽象的队列式同步器框架,提供了阻塞锁和 FIFO 队列实现同步操作。JUC 包中的同步类基本都是基于 AQS 同步器来实现的,如 ReentrantLock,Semaphore 等。

    二、原理

    1、AQS 工作机制:(三点)

    1.如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。

    2.如果被请求的共享资源被占用,则将获取不到锁的线程加入到队列中。等到占有线程释放锁后唤醒队列中的任务争抢锁,这个队列为 CLH 队列。

    3.使用state成员变量表示当前的同步状态,提供 getState,setState,compareAndSetState 进行操作。

    2、CLH 队列:

    虚拟的双向队列,底层是双向链表,包括head节点和tail结点,仅存在结点之间的关联关系。AQS将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

    3、AQS 对资源的共享方式

    AQS定义两种资源共享方式

    1.独占 ( Exclusive ):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:

    公平锁:按照线程在队列中的排队顺序,先到者先拿到锁

    非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的,所以非公平锁效率较高

    2.共享 ( Share ):多个线程可同时执行,如Semaphore、CountDownLatch。

    4、AQS 的设计模式

    AQS 同步器的设计是基于模板方法模式。使用者继承AbstractQueuedSynchronizer并重写指定的方法。实现对于共享资源state的获取和释放。

    将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。 AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用,自定义同步器时需要重写下面几个AQS提供的模板方法:

    isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。

    tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。

    tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。

    tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

    tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

    以 ReentrantLock为 例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程在tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证state是能回到零态的。

    三、空间结构

    AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。

    AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

    队列中Node的头结点

    private transient volatile Node head;   

    队列中Node的尾结点

    private transient volatile Node tail; 

    表示同步状态的成员变量,使用volatile修饰保证线程可见性

    private volatile int state;

    返回同步状态的当前值

    protected final int getState() {

    return state;

    }

    设置同步状态的值

    protected final void setState(int newState) {

    state = newState;

    }

    原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)

    protected final boolean compareAndSetState(int expect, int update) {

    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

    }

    自旋时间

    static final long spinForTimeoutThreshold = 1000L;

    Unsafe类实例

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    state内存偏移地址

    private static final long stateOffset;

    head内存偏移地址

    private static final long headOffset;

    tail内存偏移地址

    private static final long tailOffset;

    节点状态内存偏移地址

    private static final long waitStatusOffset;

    next内存偏移地址

    private static final long nextOffset;

    静态初始化块,用于加载内存偏移地址。

    static {

    try {

    stateOffset = unsafe.objectFieldOffset

    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));

    headOffset = unsafe.objectFieldOffset

    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));

    tailOffset = unsafe.objectFieldOffset

    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));

    waitStatusOffset = unsafe.objectFieldOffset

    (Node.class.getDeclaredField("waitStatus"));

    nextOffset = unsafe.objectFieldOffset

    (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }

    }

    类构造方法为从抽象构造方法,供子类调用。

    protected AbstractQueuedSynchronizer() { }   

    四、常用方法

    acquire

    该方法以独占模式获取资源,先尝试获取锁,如果获取失败则调用addWaiter将该线程加入队列中。

    源码如下:

    public final void acquire(int arg) {

    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    selfInterrupt();

    }

    由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下

    1.首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。

    2.若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。

    3.调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

    4.由于tryAcquire默认实现是抛出异常,所以此时,不进行分析,之后会结合一个例子进行分析。

    addWaiter

    使用快速添加的方式往sync queue尾部添加结点,如果sync queue队列还没有初始化,则会使用enq插入队列中。

    // 添加等待者

    private Node addWaiter(Node mode) {

    // 新生成一个结点,默认为独占模式

    Node node = new Node(Thread.currentThread(), mode);

    // Try the fast path of enq; backup to full enq on failure

    // 保存尾结点

    Node pred = tail;

    if (pred != null) { // 尾结点不为空,即已经被初始化

    // 将node结点的prev域连接到尾结点

    node.prev = pred;

    if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node

    // 设置尾结点的next域为node

    pred.next = node;

    return node; // 返回新生成的结点

    }

    }

    enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列

    return node;

    }

    enq

    使用无限循环来确保节点的成功插入。

    private Node enq(final Node node) {

    for (;;) { // 无限循环,确保结点能够成功入队列

    // 保存尾结点

    Node t = tail;

    if (t == null) { // 尾结点为空,即还没被初始化

    if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点

    tail = head; // 头结点与尾结点都指向同一个新生结点

    } else { // 尾结点不为空,即已经被初始化过

    // 将node结点的prev域连接到尾结点

    node.prev = t;

    if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node

    // 设置尾结点的next域为node

    t.next = node;

    return t; // 返回尾结点

    }

    }

    }

    }

    acquireQueue

    首先获取当前节点的前驱节点,如果前驱节点是头结点并且能够获取(资源),代表该当前节点能够占有锁,设置头结点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法

    // sync队列中的结点在独占且忽略中断的模式下获取(资源)

    final boolean acquireQueued(final Node node, int arg) {

    // 标志

    boolean failed = true;

    try {

    // 中断标志

    boolean interrupted = false;

    for (;;) { // 无限循环

    // 获取node节点的前驱结点

    final Node p = node.predecessor();

    if (p == head && tryAcquire(arg)) { // 前驱为头结点并且成功获得锁

    setHead(node); // 设置头结点

    p.next = null; // help GC

    failed = false; // 设置标志

    return interrupted;

    }

    if (shouldParkAfterFailedAcquire(p, node) &&

    parkAndCheckInterrupt())

    interrupted = true;

    }

    } finally {

    if (failed)

    cancelAcquire(node);

    }

    }

    shouldParkAfterFailedAcquire和方法,首先,我们看

    shouldParkAfterFailedAcquire

    只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。

    // 当获取(资源)失败后,检查并且更新结点状态

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    // 获取前驱结点的状态

    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1

    // 可以进行park操作

    return true;

    if (ws > 0) { // 表示状态为CANCELLED,为1

    do {

    node.prev = pred = pred.prev;

    } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点

    // 赋值pred结点的next域

    pred.next = node;

    } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中)

    // 比较并设置前驱结点的状态为SIGNAL

    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    }

    // 不能进行park操作

    return false;

    }

    parkAndCheckInterrupt

    首先执行park操作,即禁用当前线程,然后返回该线程是否已经被中断

    // 进行park操作并且返回该线程是否被中断

    private final boolean parkAndCheckInterrupt() {

    // 在许可可用之前禁用当前线程,并且设置了blocker

    LockSupport.park(this);

    return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位

    }

    cancelAcquire

    该方法完成的功能就是取消当前线程对资源的获取,即设置该节点的状态为CANCELLED

    // 取消继续获取(资源)

    private void cancelAcquire(Node node) {

    // Ignore if node doesn't exist

    // node为空,返回

    if (node == null)

    return;

    // 设置node结点的thread为空

    node.thread = null;

    // Skip cancelled predecessors

    // 保存node的前驱结点

    Node pred = node.prev;

    while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点

    node.prev = pred = pred.prev;

    // 获取pred结点的下一个结点

    Node predNext = pred.next;

    // 设置node结点的状态为CANCELLED

    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.

    if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点

    // 比较并设置pred结点的next节点为null

    compareAndSetNext(pred, predNext, null);

    } else { // node结点不为尾结点,或者比较设置不成功

    int ws;

    if (pred != head &&

    ((ws = pred.waitStatus) == Node.SIGNAL ||

    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

    pred.thread != null) { // (pred结点不为头结点,并且pred结点的状态为SIGNAL)或者

    // pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空

    // 保存结点的后继

    Node next = node.next;

    if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0

    compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;

    } else {

    unparkSuccessor(node); // 释放node的前一个结点

    }

    node.next = node; // help GC

    }

    }

    unparkSuccessor

    该方法的作用就是为了释放node节点的后继节点。

    // 释放后继结点

    private void unparkSuccessor(Node node) {

    // 获取node结点的等待状态

    int ws = node.waitStatus;

    if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3

    // 比较并且设置结点等待状态,设置为0

    compareAndSetWaitStatus(node, ws, 0);

    // 获取node节点的下一个结点

    Node s = node.next;

    if (s == null || s.waitStatus > 0) { // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED

    // s赋值为空

    s = null;

    // 从尾结点开始从后往前开始遍历

    for (Node t = tail; t != null && t != node; t = t.prev)

    if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点

    // 保存结点

    s = t;

    }

    if (s != null) // 该结点不为为空,释放许可

    LockSupport.unpark(s.thread);

    }

    对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:

    其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。

    现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:

    判断结点的前驱是否为head并且是否成功获取(资源)。

    若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。

    若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。

    若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。

    release

    以独占模式释放对象,其中 tryRelease 的默认实现是抛出异常,需要具体的子类实现,如果 tryRelease 成功,那么如果头结点不为空并且头结点的状态不为 0,则释放头结点的后继结点。

    public final boolean release(int arg) {

    if (tryRelease(arg)) { // 释放成功

    // 保存头结点

    Node h = head;

    if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0

    unparkSuccessor(h); //释放头结点的后继结点

    return true;

    }

    return false;

    }

    五、内部类

    Node类

    每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

    1.CANCELLED,值为1,表示当前的线程被取消。

    2.SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。

    3.CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。

    4.PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。

    5.值为0,表示当前节点在sync queue中,等待着获取锁。

    static final class Node {

    // 模式,分为共享与独占

    // 共享模式

    static final Node SHARED = new Node();

    // 独占模式

    static final Node EXCLUSIVE = null;

    // 结点状态

    // CANCELLED,值为1,表示当前的线程被取消

    // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark

    // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中

    // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行

    // 值为0,表示当前节点在sync队列中,等待着获取锁

    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    // 结点状态

    volatile int waitStatus;

    // 前驱结点

    volatile Node prev;

    // 后继结点

    volatile Node next;

    // 结点所对应的线程

    volatile Thread thread;

    // 下一个等待者

    Node nextWaiter;

    // 结点是否在共享模式下等待

    final boolean isShared() {

    return nextWaiter == SHARED;

    }

    // 获取前驱结点,若前驱结点为空,抛出异常

    final Node predecessor() throws NullPointerException {

    // 保存前驱结点

    Node p = prev;

    if (p == null) // 前驱结点为空,抛出异常

    throw new NullPointerException();

    else // 前驱结点不为空,返回

    return p;

    }

    // 无参构造方法

    Node() {    // Used to establish initial head or SHARED marker

    }

    // 构造方法

    Node(Thread thread, Node mode) {    // Used by addWaiter

    this.nextWaiter = mode;

    this.thread = thread;

    }

    // 构造方法

    Node(Thread thread, int waitStatus) { // Used by Condition

    this.waitStatus = waitStatus;

    this.thread = thread;

    }

    }

    ConditionObject类

    // 内部类

    public class ConditionObject implements Condition, java.io.Serializable {

    // 版本号

    private static final long serialVersionUID = 1173984872572414699L;

    // condition队列的头结点

    private transient Node firstWaiter;

    // condition队列的尾结点

    private transient Node lastWaiter;

    // 构造方法

    public ConditionObject() { }

    // 添加新的waiter到wait队列

    private Node addConditionWaiter() {

    // 保存尾结点

    Node t = lastWaiter;

    // 尾结点不为空,并且尾结点的状态不为CONDITION

    if (t != null && t.waitStatus != Node.CONDITION) {

    // 清除状态为CONDITION的结点

    unlinkCancelledWaiters();

    // 将最后一个结点重新赋值给t

    t = lastWaiter;

    }

    // 新建一个结点

    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    if (t == null) // 尾结点为空

    // 设置condition队列的头结点

    firstWaiter = node;

    else // 尾结点不为空

    // 设置为节点的nextWaiter域为node结点

    t.nextWaiter = node;

    // 更新condition队列的尾结点

    lastWaiter = node;

    return node;

    }

    private void doSignal(Node first) {

    // 循环

    do {

    if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空

    // 设置尾结点为空

    lastWaiter = null;

    // 设置first结点的nextWaiter域

    first.nextWaiter = null;

    } while (!transferForSignal(first) &&

    (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环

    }

    private void doSignalAll(Node first) {

    // condition队列的头结点尾结点都设置为空

    lastWaiter = firstWaiter = null;

    // 循环

    do {

    // 获取first结点的nextWaiter域结点

    Node next = first.nextWaiter;

    // 设置first结点的nextWaiter域为空

    first.nextWaiter = null;

    // 将first结点从condition队列转移到sync队列

    transferForSignal(first);

    // 重新设置first

    first = next;

    } while (first != null);

    }

    // 从condition队列中清除状态为CANCEL的结点

    private void unlinkCancelledWaiters() {

    // 保存condition队列头结点

    Node t = firstWaiter;

    Node trail = null;

    while (t != null) { // t不为空

    // 下一个结点

    Node next = t.nextWaiter;

    if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态

    // 设置t节点的额nextWaiter域为空

    t.nextWaiter = null;

    if (trail == null) // trail为空

    // 重新设置condition队列的头结点

    firstWaiter = next;

    else // trail不为空

    // 设置trail结点的nextWaiter域为next结点

    trail.nextWaiter = next;

    if (next == null) // next结点为空

    // 设置condition队列的尾结点

    lastWaiter = trail;

    }

    else // t结点的状态为CONDTION状态

    // 设置trail结点

    trail = t;

    // 设置t结点

    t = next;

    }

    }

    // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。

    public final void signal() {

    if (!isHeldExclusively()) // 不被当前线程独占,抛出异常

    throw new IllegalMonitorStateException();

    // 保存condition队列头结点

    Node first = firstWaiter;

    if (first != null) // 头结点不为空

    // 唤醒一个等待线程

    doSignal(first);

    }

    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。

    public final void signalAll() {

    if (!isHeldExclusively()) // 不被当前线程独占,抛出异常

    throw new IllegalMonitorStateException();

    // 保存condition队列头结点

    Node first = firstWaiter;

    if (first != null) // 头结点不为空

    // 唤醒所有等待线程

    doSignalAll(first);

    }

    // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断

    public final void awaitUninterruptibly() {

    // 添加一个结点到等待队列

    Node node = addConditionWaiter();

    // 获取释放的状态

    int savedState = fullyRelease(node);

    boolean interrupted = false;

    while (!isOnSyncQueue(node)) { //

    // 阻塞当前线程

    LockSupport.park(this);

    if (Thread.interrupted()) // 当前线程被中断

    // 设置interrupted状态

    interrupted = true;

    }

    if (acquireQueued(node, savedState) || interrupted)

    selfInterrupt();

    }

    private static final int REINTERRUPT =  1;

    private static final int THROW_IE    = -1;

    private int checkInterruptWhileWaiting(Node node) {

    return Thread.interrupted() ?

    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :

    0;

    }

    private void reportInterruptAfterWait(int interruptMode)

    throws InterruptedException {

    if (interruptMode == THROW_IE)

    throw new InterruptedException();

    else if (interruptMode == REINTERRUPT)

    selfInterrupt();

    }

    // 等待,当前线程在接到信号或被中断之前一直处于等待状态

    public final void await() throws InterruptedException {

    if (Thread.interrupted()) // 当前线程被中断,抛出异常

    throw new InterruptedException();

    // 在wait队列上添加一个结点

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

    // 阻塞当前线程

    LockSupport.park(this);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型

    break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

    interruptMode = REINTERRUPT;

    if (node.nextWaiter != null) // clean up if cancelled

    unlinkCancelledWaiters();

    if (interruptMode != 0)

    reportInterruptAfterWait(interruptMode);

    }

    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态

    public final long awaitNanos(long nanosTimeout)

    throws InterruptedException {

    if (Thread.interrupted())

    throw new InterruptedException();

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    final long deadline = System.nanoTime() + nanosTimeout;

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

    if (nanosTimeout <= 0L) {

    transferAfterCancelledWait(node);

    break;

    }

    if (nanosTimeout >= spinForTimeoutThreshold)

    LockSupport.parkNanos(this, nanosTimeout);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

    break;

    nanosTimeout = deadline - System.nanoTime();

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

    interruptMode = REINTERRUPT;

    if (node.nextWaiter != null)

    unlinkCancelledWaiters();

    if (interruptMode != 0)

    reportInterruptAfterWait(interruptMode);

    return deadline - System.nanoTime();

    }

    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态

    public final boolean awaitUntil(Date deadline)

    throws InterruptedException {

    long abstime = deadline.getTime();

    if (Thread.interrupted())

    throw new InterruptedException();

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    boolean timedout = false;

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

    if (System.currentTimeMillis() > abstime) {

    timedout = transferAfterCancelledWait(node);

    break;

    }

    LockSupport.parkUntil(this, abstime);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

    break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

    interruptMode = REINTERRUPT;

    if (node.nextWaiter != null)

    unlinkCancelledWaiters();

    if (interruptMode != 0)

    reportInterruptAfterWait(interruptMode);

    return !timedout;

    }

    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0

    public final boolean await(long time, TimeUnit unit)

    throws InterruptedException {

    long nanosTimeout = unit.toNanos(time);

    if (Thread.interrupted())

    throw new InterruptedException();

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    final long deadline = System.nanoTime() + nanosTimeout;

    boolean timedout = false;

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

    if (nanosTimeout <= 0L) {

    timedout = transferAfterCancelledWait(node);

    break;

    }

    if (nanosTimeout >= spinForTimeoutThreshold)

    LockSupport.parkNanos(this, nanosTimeout);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

    break;

    nanosTimeout = deadline - System.nanoTime();

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

    interruptMode = REINTERRUPT;

    if (node.nextWaiter != null)

    unlinkCancelledWaiters();

    if (interruptMode != 0)

    reportInterruptAfterWait(interruptMode);

    return !timedout;

    }

    final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {

    return sync == AbstractQueuedSynchronizer.this;

    }

    //  查询是否有正在等待此条件的任何线程

    protected final boolean hasWaiters() {

    if (!isHeldExclusively())

    throw new IllegalMonitorStateException();

    for (Node w = firstWaiter; w != null; w = w.nextWaiter) {

    if (w.waitStatus == Node.CONDITION)

    return true;

    }

    return false;

    }

    // 返回正在等待此条件的线程数估计值

    protected final int getWaitQueueLength() {

    if (!isHeldExclusively())

    throw new IllegalMonitorStateException();

    int n = 0;

    for (Node w = firstWaiter; w != null; w = w.nextWaiter) {

    if (w.waitStatus == Node.CONDITION)

    ++n;

    }

    return n;

    }

    // 返回包含那些可能正在等待此条件的线程集合

    protected final Collection getWaitingThreads() {

    if (!isHeldExclusively())

    throw new IllegalMonitorStateException();

    ArrayList list = new ArrayList();

    for (Node w = firstWaiter; w != null; w = w.nextWaiter) {

    if (w.waitStatus == Node.CONDITION) {

    Thread t = w.thread;

    if (t != null)

    list.add(t);

    }

    }

    return list;

    }

    }

    此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下

    public interface Condition {

    // 等待,当前线程在接到信号或被中断之前一直处于等待状态

    void await() throws InterruptedException;

    // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断

    void awaitUninterruptibly();

    //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态

    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。

    void signal();

    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。

    void signalAll();

    }

    原文链接:https://blog.csdn.net/tc979907461/article/details/105979761

    相关文章

      网友评论

        本文标题:并发锁核心类AQS学习笔记(超详细)

        本文链接:https://www.haomeiwen.com/subject/rxssnhtx.html