美文网首页
AQS源码浅析(6)——条件队列

AQS源码浅析(6)——条件队列

作者: 墨_0b54 | 来源:发表于2022-04-08 17:25 被阅读0次

    一、ConditionObject数据结构

    简单回顾条件队列的数据结构,一个单链表。

    // 条件队列所关心的Node结构
    static final class Node {
            static final int CANCELLED =  1; //线程已取消
            static final int CONDITION = -2; //处于条件队列
            volatile int waitStatus;
            volatile Thread thread; //使该节点入队的线程。使用后为null
            //如果在条件队列(只有在独占模式下才能访问)是一个单向链表,nextWaiter是单向链表的next指针。
            Node nextWaiter;
            Node(Thread thread, int waitStatus) { // 给Condition对象使用
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
    }
    
    //条件队列Condition数据结构
    public class ConditionObject implements Condition, java.io.Serializable {
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
            /** Mode meaning to reinterrupt on exit from wait */
            private static final int REINTERRUPT =  1; // 线程被唤醒后产生中断,不抛异常
            /** Mode meaning to throw InterruptedException on exit from wait */
            private static final int THROW_IE    = -1; // 线程在阻塞时被中断唤醒,抛异常
    }
    
    • 条件队列只有在独占模式下才能访问
    • 在条件队列只有CONDITION和CANCELLED状态,node初始状态是CONDITION
    • 条件队列使用的链表指针是nextWaiter字段

    下面重点讲await等待方法和signal方法。。。

    二、await可中断条件等待

    线程持有独占锁才可以调用该方法,当线程进入同步队列后调用acquireQueued方法参与锁的竞争。

    public final void await() throws InterruptedException { //调用这个方法的线程肯定是持有锁的
        if (Thread.interrupted()) 
            throw new InterruptedException();
        Node node = addConditionWaiter(); //将当前线程加入当前Condition等待队列,顺便清除等待队列清除中所有状态不是CONDITION的node
        int savedState = fullyRelease(node); //释放锁,并返回原本锁的state状态值
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {//node进入同步队列后退出自旋,其他线程唤醒当前线程后会将当前线程加入同步队列
            LockSupport.park(this);//阻塞当前线程,条件队列的线程await时都会被阻塞在这里等待其他线程唤醒或者中断唤醒
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;//不等于0表示中断,中断即退出
        }//退出时,node已经处于同步队列,状态也不是CONDITION
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters(); //执行到这里一定是拿到锁了,清理等待队列中的已取消的node
        if (interruptMode != 0)//拿到锁后才可以执行中断逻辑
            reportInterruptAfterWait(interruptMode); //检查并执行抛异常,或者设置中断状态
    }
    
    • 条件队列中阻塞的线程可以被【中断唤醒】或者【signal方法唤醒】,有可能同时发生,问题在于怎么判定两者谁先发生
    • 线程被唤醒后,都会调用checkInterruptWhileWaiting方法,来检查和处理【中断唤醒】
    • 该方法内部会调用transferAfterCancelledWait方法,该方法根据CAS结果判断是否【中断唤醒】
    • CAS成功,那么认为线程是【中断唤醒】,线程需要将自己放到同步队列
    • CAS失败,说明signal同时发生且没有争过signal,那么认为线程是【signal方法唤醒】,线程需等待signal将线程放到同步队列后再返回
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
    
    final boolean transferAfterCancelledWait(Node node) { 
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node)) 
            Thread.yield();
        return false;
    }
    

    三、signal条件唤醒

    将等待时间最长的线程(如果存在)从该条件的等待队列移动到拥有锁的等待队列

    public final void signal() {
        if (!isHeldExclusively()) //检查当前线程是否独占
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;//出条件队列,nextWaiter设置为null
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null); //成功唤醒一个等待者或者没有等待者就会退出循环
    }
    

    直接看transferForSignal内部实现:

    final boolean transferForSignal(Node node) {//将节点从条件队列转移到同步队列。如果成功则返回真,
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //可能与transferAfterCancelledWait竞争
            return false;
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node); //入队
        int ws = p.waitStatus; //原尾节点的状态
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); //如果前驱节点被取消,或者修改前驱节点状态失败,直接唤醒线程
        return true;
    }
    
    • 该方法会与上文的transferAfterCancelledWait方法竞争
    • transferAfterCancelledWait与该方法的方法名都以transfer开头,都有转移节点的意思
    • 如果transferForSignal是给Signal调用的,那么transferAfterCancelledWait方法意思就是给取消等待的线程调用的
    • CAS竞争失败,直接返回唤醒节点失败
    • CAS竞争成功,将线程入同步队列
    • 如果前驱节点被取消,或者未把前驱节点状态设置为SIGNAL,说明node不能安全阻塞,需要直接唤醒

    相关文章

      网友评论

          本文标题:AQS源码浅析(6)——条件队列

          本文链接:https://www.haomeiwen.com/subject/jeqssrtx.html