美文网首页
多线程与并发(七):AQS原理

多线程与并发(七):AQS原理

作者: lilykeke | 来源:发表于2021-09-09 09:23 被阅读0次

    1 AQS原理

    全称是 AbstractQueuedSynchronizer ,阻塞式锁和相关同步工具的框架

    1.1 特点

    • 用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

    • getState 获取state状态

    • setState 设置state 状态

    • compareAndState cas机制设置state状态

    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

    • 提供了基于FIFO的等待队列,类似于Monitor 的EntryList

    • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor 的WaitSet

    子类主要实现这样的一些方法(默认抛出UnsupportedOperationException)

    • tryAcquire
    • tryRelease
    • tryAcquireShared
    • tryReleaseShared
    • isHeldExclusively

    获取锁的姿势

    //如果获取锁失败
    if (! tryAcquire(arg)) {
        //入队,可以选择阻塞当前线程 park unpark 机制
    }
    

    释放锁的姿势

    //如果释放锁成功
    if (tryRelease(arg)) {
        //让阻塞线程恢复运行
    }
    

    2. 自定义锁

    自定义锁实现 Lock 接口,定义一个内部类继承 AbstractQueuedSynchronizer 类
    自定义锁是一个不可重入锁

    public class AqsTest {
        public static void main(String[] args) {
            MyLock lock = new MyLock();
            new Thread(()->{
                lock.lock();
                try{
                    System.out.println("lock" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }finally {
                    System.out.println("unlock" + Thread.currentThread().getName());
                    lock.unlock();
                }
            }).start();
    
            new Thread(()->{
                lock.lock();
                try{
                    System.out.println("lock" + Thread.currentThread().getName());
                }finally {
                    System.out.println("unlock" + Thread.currentThread().getName());
                    lock.unlock();
                }
            }).start();
        }
    
    }
    
    /**
     * 自定义不可重入锁
     */
    class MyLock implements Lock {
    
        class MySync extends AbstractQueuedSynchronizer{
            @Override
            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(0,1)) {
                    //表示加上锁了,设置owner为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
    
                //注意两个方法顺序
                setExclusiveOwnerThread(null);
               //state 使用volatile修饰,禁止指令重排
                setState(0); //表示解锁
    
                return true;
            }
    
            /**
             * 是否持有独占锁
             * @return
             */
            @Override
            protected boolean isHeldExclusively() {
    
                return getState() == 1;
            }
    
            public Condition newCondition(){
                return new ConditionObject();
            }
        }
    
        private MySync sync = new MySync();
    
        /**
         * 不成功进入等待队列等待
         */
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
           sync.acquireInterruptibly(1);
        }
    
        /**
         * 尝试加锁,只会尝试一次
         * @return
         */
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1,unit.toNanos(time));
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        /**
         * 创建条件变量
         * @return
         */
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    

    测试结果:
    lockThread-0
    unlockThread-0
    lockThread-1
    unlockThread-1


    3. ReentrantLock 原理

    ReentrantLock UML类图


    reentrantlock_uml.png

    3.1 非公平锁实现原理

    3.1.1 加解锁流程

    加锁流程

    没有竞争时

    非公平锁-1.png

    第一个竞争出现时
    Thread-1 执行了

    1. CAS 尝试将state 由 0 改为 1 ,结果失败
    2. 进入tryAcquire逻辑,这时 state 已经是 1 ,结果仍然失败
    3. 接下来进入addWaiter逻辑,构造Node队列
      • 途中黄色三角表示该Node的waitStatus 状态,其中 0 为默认正常状态
      • Node的创建时懒惰的
      • 其中第一个Node成为 Dummy(哑元) 或哨兵,用来占位,并不关联线程
    非公平锁-3 (1).png
    1. 当前线程进入acquireQueued 逻辑
    • acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入park 阻塞
    • 如果自己是紧邻着 head (排第二位),那么再次tryAcquire 尝试获取锁,当然这时state 仍为1,失败
    • 进入 shouldParkAfterFailedAcquire(p, node) 逻辑,将前驱node ,即head 的waitStatus 改为 -1 ,这次返回false (-1 表示有责任唤醒它的后继节点)
    非公平锁-4.png
    • shouldParkAfterFailedAcquire(p, node) 执行完毕后回到acquireQueued, 再次tryAcquire 尝试获得锁,当然这时的 state 仍为1,失败
    • 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱node 的waitStatus 已经是 -1,这次返回true
    • 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
    非公平锁-5.png
    • 再次有多个线程经历上述过程竞争失败,变成这个样子
    非公平锁-6.png

    解锁流程

    1.Thread-0 释放锁,进入tryRelease 流程,如果成功

    • 设置 execlusiveOwnerThread 为null
    • state = 0
    非公平锁-解锁1.png

    2.当前队列不为null , 并且 head 的waitStatus = -1,进入unparkSuccessor 流程

    • 找到队列中离head 最近的一个Node (没取消的),unpark 恢复其运行,本例中为Thread-1

    3.回到Thread-1 的acquireQueue流程

    非公平锁-解锁2.png

    如果加锁成功(没有竞争),会设置

    • exclusiveOwnerThread 为Thread-1 ,state = 1;
    • head 指向刚刚Thread-1 所在的Node,该Node清空Thread
    • 原本的head因为从链表断开,而可被垃圾回收

    4.如果这时候有其他线程来竞争(非公平的体现),例如这时Thread-4 来了

    如果不巧被Thread-4占了先机

    • Thread-4 被设置为exclusiveOwnerThread , state = 1
    • Thread-1 再次进入acquireQueued 流程,获取锁失败,重新进入park 阻塞
    源码

    先从构造器看,默认为非公平锁实现

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    

    NonfairSync 继承自 Sync , Sync 继承自AQS

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        } 
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            /**
             * Performs {@link Lock#lock}. The main reason for subclassing
             * is to allow fast path for nonfair version.
             */
            abstract void lock();
    
            /**
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            protected final boolean isHeldExclusively() {
                // While we must in general read state before owner,
                // we don't need to do so to check if current thread is owner
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            // Methods relayed from outer class
    
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    
            final boolean isLocked() {
                return getState() != 0;
            }
    
            /**
             * Reconstitutes the instance from a stream (that is, deserializes it).
             */
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //获取前驱节点
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        //创建一个节点,waitStatus = null
        Node node = new Node(Thread.currentThread(), mode);
        //如果当前队列已经有节点了,直接将该节点加入到AQS队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
       //如果当前队列为空,要先初始化,new Node()
        enq(node);
        return node;
    }
    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    3.1.2 可重入原理

    具体分析见源码

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
    
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    
        abstract void lock();
    
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
    
         
            if (c == 0) {
                //当前线程没有获得锁,尝试修改状态0->1
                if (compareAndSetState(0, acquires)) {
                   //修改状态成功,表示获得锁。设置owner为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果已经获得了锁,线程还是当前线程,表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //设置当前的state, 同一个线程锁重入state++
                setState(nextc);
                return true;
            }
            return false;
        }
    
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //c==0时重新设置 owner 返回true,否则只更新state值,返回false
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    
        //是否独占锁
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
       ...
    }
    

    3.1.3 可打断原理

    不可打断模式

    在此模式下,即使它被打断,仍会驻留在AQS 队列中,等获得锁后方能继续运行(是继续运行,只是打断标记被设置为true)

    private final boolean parkAndCheckInterrupt() {
        // 如果打断标记已经是true,则park 会失效
        LockSupport.park(this);
        // interrupted 会清除打断标记
        return Thread.interrupted();
    }
    
    //未获得到锁时,调用该方法
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //还是需要获得锁后,才能返回打断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果是因为interrupt 被唤醒,返回打断状态为true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果打断状态为true
            selfInterrupt();
    }
    
    static void selfInterrupt() {
        // 重新产生一次中断
        Thread.currentThread().interrupt();
    }
    

    可打断模式

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            // 如果没有获得锁,进入
            doAcquireInterruptibly(arg);
    }
    
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //在park过程中如果被interrupt 会进入此
                    // 这时抛出异常,而不会再次进入 for(;;)
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    4. 公平锁实现原理

    非公平锁

    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //如果还没有获得锁
            if (c == 0) {
                //尝试用CAS获得锁,这里体现非公平性:不去检查AQS队列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁,线程还是当前线程,表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //获取失败,回到调用处
            return false;
        }
    

    公平锁实现

    // 与非公平锁的主要区别在于tryAcquire 方法的实现
    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 先检查AQS队列中是否有前驱节点
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    

    5. 条件变量的实现原理

    每个条件变量其实就对应着一个等待队列,实现类是ConditionObject

    await 流程

    开始 Thread-0持有锁,调用await,进入 ConditionObject 的 addConditionWaiter 流程创建新的Node状态为-2(Node.CONDITION),关联 Thread-0,加入等待队列的尾部


    条件变量.jpg
    public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
           //调用ConditionObject.await() 先进入addConditionWaiter()流程
            Node node = addConditionWaiter();
           //接下来进入AQS的fullyRelease 流程,释放同步器上的锁
           //为什么这里要调用 fullyRelease(node)? 有可能一个线程重入好几次
            int savedState = fullyRelease(node);
            int interruptMode = 0;
           //进入一个循环,判断
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    
    private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                //给当前线程创建一个新节点,连接到链表上
                //waitStatus == Node.CONDITION
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    当前线程添加到 ConditionObject队列中后,当前线程要释放锁

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            //释放锁成功,返回当前状态;若释放失败,设置当前节点的状态为Node.CANCELLED
            //release(savedState) 还会唤醒等待队列中的下一个节点
            //unpark AQS队列的下一个节点,竞争锁,假设没有其他线程竞争,Thread-1竞争锁成功
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    signal 流程

    假设Thread-1 要来唤醒 Thread-0

    signal.png

    进入ConditionObject 的doSignal()流程

    
    /**
     * Moves the longest-waiting thread, if one exists, from the
     * wait queue for this condition to the wait queue for the
     * owning lock.
     *
     */
    public final void signal() {
        //如果当前线程没有持有锁,调用signal()抛异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //取出等待队列的头结点
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    /**
     * Removes and transfers nodes until hit non-cancelled one or
     * null. Split out from signal in part to encourage compilers
     * to inline the case of no waiters.
     * @param first (non-null) the first node on condition queue
     */
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    

    执行 transferForSignal 流程,将该Node 加入 AQS队列尾部,将Thread-0 的waitStatus 改为 0;将Thread-3的 waitStatus 改为 -1

     /**
      * Transfers a node from a condition queue onto sync queue.
      * Returns true if successful.
      * @param node the node
      * @return true if successfully transferred (else the node was
      * cancelled before signal)
      */
    //将这个节点从 condition队列转移到 sync队列,如果成功则返回true
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //首先尝试cas操作将waitStatus 改为0,若失败,直接返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //拼接将该节点到 aqs 队尾。调用 enq()返回该节点的前置节点
       
        Node p = enq(node);
        int ws = p.waitStatus;
       
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true
    }
    

    Node节点定义

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     The successor of this node is (or will soon be)
             *               blocked (via park), so the current node must
             *               unpark its successor when it releases or
             *               cancels. To avoid races, acquire methods must
             *               first indicate they need a signal,
             *               then retry the atomic acquire, and then,
             *               on failure, block.
             *   CANCELLED:  This node is cancelled due to timeout or interrupt.
             *               Nodes never leave this state. In particular,
             *               a thread with cancelled node never again blocks.
             *   CONDITION:  This node is currently on a condition queue.
             *               It will not be used as a sync queue node
             *               until transferred, at which time the status
             *               will be set to 0. (Use of this value here has
             *               nothing to do with the other uses of the
             *               field, but simplifies mechanics.)
             *   PROPAGATE:  A releaseShared should be propagated to other
             *               nodes. This is set (for head node only) in
             *               doReleaseShared to ensure propagation
             *               continues, even if other operations have
             *               since intervened.
             *   0:          None of the above
             *
             * The values are arranged numerically to simplify use.
             * Non-negative values mean that a node doesn't need to
             * signal. So, most code doesn't need to check for particular
             * values, just for sign.
             *
             * The field is initialized to 0 for normal sync nodes, and
             * CONDITION for condition nodes.  It is modified using CAS
             * (or when possible, unconditional volatile writes).
             */
            volatile int waitStatus;
    
            /**
             * Link to predecessor node that current node/thread relies on
             * for checking waitStatus. Assigned during enqueuing, and nulled
             * out (for sake of GC) only upon dequeuing.  Also, upon
             * cancellation of a predecessor, we short-circuit while
             * finding a non-cancelled one, which will always exist
             * because the head node is never cancelled: A node becomes
             * head only as a result of successful acquire. A
             * cancelled thread never succeeds in acquiring, and a thread only
             * cancels itself, not any other node.
             */
            volatile Node prev;
    
            /**
             * Link to the successor node that the current node/thread
             * unparks upon release. Assigned during enqueuing, adjusted
             * when bypassing cancelled predecessors, and nulled out (for
             * sake of GC) when dequeued.  The enq operation does not
             * assign next field of a predecessor until after attachment,
             * so seeing a null next field does not necessarily mean that
             * node is at end of queue. However, if a next field appears
             * to be null, we can scan prev's from the tail to
             * double-check.  The next field of cancelled nodes is set to
             * point to the node itself instead of null, to make life
             * easier for isOnSyncQueue.
             */
            volatile Node next;
    
            /**
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;
    
            /**
             * Link to next node waiting on condition, or the special
             * value SHARED.  Because condition queues are accessed only
             * when holding in exclusive mode, we just need a simple
             * linked queue to hold nodes while they are waiting on
             * conditions. They are then transferred to the queue to
             * re-acquire. And because conditions can only be exclusive,
             * we save a field by using special value to indicate shared
             * mode.
             */
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    
    

    相关文章

      网友评论

          本文标题:多线程与并发(七):AQS原理

          本文链接:https://www.haomeiwen.com/subject/drhxvktx.html