美文网首页
2. AbstractQueuedSynchronizer(二)

2. AbstractQueuedSynchronizer(二)

作者: shallowinggg | 来源:发表于2019-03-15 23:10 被阅读0次

    Conditon接口与示例

    Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。

    Condition的使用方式比较简单,需要主语在调用方法前获取锁,如下所示:

        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        
        public void conditionAwait() throws InterruptedException {
            lock.lock();
            try {
                condition.await();
            } finally {
                lock.unlock();
            }
        }
        
        public void conditionSignal() throws InterruptedException {
            lock.lock();
            try {
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    

    如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才会从await()方法返回,并且在返回前已经获取了锁。

    Condition接口定义的方法如下:

    方法名称 描述
    void await() throws InterruptedException 当前线程进入等待状态直到被通知或中断,当前线程进入运行状态且从await()方法返回的情况包括:
    1.其他线程调用Condition的signal()或signalAll()方法,而当前线程被选中唤醒。
    2.其他线程中断当前线程。
    如果当前等待线程从await()方法返回,那么表面该线程已经获取了Condition对象所对应的锁
    void awaitUninterruptibly() 当前线程进入等待状态直到被通知,从方法名称可以看出该方法对中断不敏感
    long awaitNanos(long nanosTimeout) throws InterruptedException 当前线程进行等待状态直到被通知、中断或者超时。返回值表示剩余的时间
    boolean await(long time, TimeUnit unit) throws InterruptedException 当前线程进行等待状态直到被通知、中断或者超时,与awaitNanos区别就是可以指定时间单位
    boolean awaitUntil(Date deadline) throws InterruptedException 当前线程进行等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,返回true,否则返回false
    void signal() 唤醒一个等待在Condition上的线程,该线程从等待方法上返回前必须获得与Condition相关联的锁
    void signalAll() 唤醒所有等待在Condition上的线程,能从等待方法上返回的线程必须先获得与Condition相关联的锁

    下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作会阻塞插入线程,直接队列出现空位。

    package util;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundedQueue<T> {
        private Object[] items;
        private int addIndex;
        private int removeIndex;
        private int count;
        private Lock lock = new ReentrantLock();
        private Condition notFull = lock.newCondition();
        private Condition notEmpty = lock.newCondition();
    
        public BoundedQueue(int count) {
            items = new Object[count];
        }
    
        public void add(T item) throws InterruptedException {
            lock.lock();
            try {
                while (count==items.length) {
                    notFull.await();
                }
                items[addIndex] = item;
                if(++addIndex == items.length) {
                    addIndex=0;
                }
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        @SuppressWarnings("unchecked")
        public T remove() throws InterruptedException {
            lock.lock();
            try {
                while (count==0) {
                    notEmpty.await();
                }
                Object o = items[removeIndex];
                if(++removeIndex==items.length) {
                    removeIndex=0;
                }
                --count;
                notFull.signal();
                return (T) o;
            } finally {
                lock.unlock();
            }
        }
    }
    

    上述示例中,BoundedQueue通过add(T)方法添加一个元素,通过remove()方法移除一个元素。

    以增加方法为例,首先需要获得锁,目的是确保数组修改的可见性和排他性。当数量元素数量等于数组长度时,表示数组已满,调用notFull.await(),当前线程随之释放锁并进入等待状态。如果数组元素数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中有新元素可以获取。

    Condition实现分析

    ConditionObject是AQS的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也比较合理。每个Condition对象都包含一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

    等待队列

    等待队列是一个FIFO队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义服用了同步器中节点的定义,也就是两个节点类型都是AbstractQueuedSynchronizer.Node

    一个Condition中包含了一个等待队列,Condition拥有首节点和尾节点。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。

    > line: 1862
    public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
    

    等待队列的基本结构如下:

    等待

    Condition拥有首尾节点的引用,而新增节点只需要将原来的尾节点nextWaiter指向它,并且更新尾节点即可。上述的更新过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

    > line: 2007
    public final void awaitUninterruptibly() {
        // 增加尾节点
        Node node = addConditionWaiter();
    
        // 释放同步状态
        int savedState = fullyRelease(node);
        boolean interrupted = false;
    
        // 阻塞当前线程
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if (Thread.interrupted())
                interrupted = true;
        }
    
        // 线程被唤醒后再次尝试获取同步状态
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }
    
    > line: 1880
    private Node addConditionWaiter() {
        // 如果不是持有锁的线程调用,抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node t = lastWaiter;
        // 如果lastWaiter被取消了,则清除它
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
    
        // 创建一个CONDITION节点
        Node node = new Node(Node.CONDITION);
    
        // 更新字段
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    
    > line: 1943
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            // 解除对此节点的引用
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    
    > line: 1756
    final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }
    
    > line: 1666
    final boolean isOnSyncQueue(Node node) {
        // 在addConditionWaiter()方法上创建的新节点都是CONDITION节点
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
    

    在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切的说是AQS)有一个同步队列和多个等待队列:

    等待的响应中断版本如下:

    > line: 2068
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            // 检查等待时线程是否被中断
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 被唤醒或者中断后再次尝试获取同步状态
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 处理中断
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    > line: 2037
    private int checkInterruptWhileWaiting(Node node) {
        // 如果线程等待时被中断,尝试将其加入到同步队列中
        // 在被通知前被中断了,返回THROW_IE
        // 在被通知后被中断了,返回REINTERRUPT
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
    
    > line: 2028
    /*
     * 对于响应中断的等待,我们需要跟踪是否要抛出InterruptedException
     * 异常,如果在等待队列上被阻塞时被中断了,那么抛出异常,
     * 如果在同步队列上被阻塞时被中断了,那么就再次中断。
    */
    
    /** Mode meaning to reinterrupt on exit from wait */
    private static final int REINTERRUPT =  1;
    /** Mode meaning to throw InterruptedException on exit from wait */
    private static final int THROW_IE    = -1;
    
    > line: 1734
    final boolean transferAfterCancelledWait(Node node) {
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
    
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
    
    > line: 629
    // 此方法与addWaiter(Node)方法几乎一样
    private Node enq(Node node) {
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
                initializeSyncQueue();
            }
        }
    }
    
    > line: 2047
    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    
    

    等待的超时版本如下:

    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        // 响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 我们不会在这里检查 nanosTimeout <= 0L,使得awaitNanos(0)成为一种释放锁的方式
        // 计算截止时间
        final long deadline = System.nanoTime() + nanosTimeout;
        long initialNanos = nanosTimeout;
        // 增加到等待队列中
        Node node = addConditionWaiter();
        // 释放同步状态
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 超时时间已经过去,移到同步队列中,退出循环
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            // 睡眠指定时间
            if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            // 重新计算超时时间
            nanosTimeout = deadline - System.nanoTime();
        }
        // 尝试获取同步状态
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        long remaining = deadline - System.nanoTime(); // avoid overflow
        return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
    }
    
    > line: 2146
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        // 获取截止时间
        long abstime = deadline.getTime();
        // 响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 增加到等待队列中
        Node node = addConditionWaiter();
        // 释放同步状态
        int savedState = fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 到达截止时间,移到同步队列,退出循环
            if (System.currentTimeMillis() >= abstime) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            // 阻塞线程
            LockSupport.parkUntil(this, abstime);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 尝试获取同步状态
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }
    

    通知

    调用Condition的signal()方法会唤醒在等待队列中等待时间最长的节点(即首节点),在唤醒线程前,将节点移到同步队列中。

    > line: 1973
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    > line: 1906
    private void doSignal(Node first) {
        // 解除头节点的引用,同时更新头节点
        // 然后尝试将头节点移到同步队列,如果失败表示此节点已经被取消,
        // 唤醒下一个节点
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    > line: 1707
    final boolean transferForSignal(Node node) {
        /*
         * 如果无法改变waitStatus,表示这个节点已经被取消了
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
    
        // 移到同步队列中,并返回前驱结点
        Node p = enq(node);
        int ws = p.waitStatus;
        // 尝试将前驱结点的waitStatus设置为SIGNAL
        // 如果失败了或者前驱节点被取消了,那么就唤醒线程使其继续尝试获取同步状态
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    
    

    调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法开头进行了检查。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

    被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node)方法返回true),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。

    public final void awaitUninterruptibly() {
        // 增加尾节点
        Node node = addConditionWaiter();
    
        // 释放同步状态
        int savedState = fullyRelease(node);
        boolean interrupted = false;
    
        // 阻塞当前线程
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if (Thread.interrupted())
                interrupted = true;
        }
    
        // 线程被唤醒后再次尝试获取同步状态
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }
    
    final boolean isOnSyncQueue(Node node) {
        // 被移动到同步队列的节点waitStatus被更新为0
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
    
        // 查看next字段的注释:"The next field of cancelled nodes is set to
        //     point to the node itself instead of null, to make life
        //     easier for isOnSyncQueue."。从此处我们可以发现这个if语句
        // 存在的意义。
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        // 想象这样一种场景,有一个线程调用signal方法并执行到transferForSignal
        // 方法中的enq方法,尝试将此节点插入到同步队列中,查看enq源码可以发现,
        // 不管CAS tail是否成功,此节点的prev节点已经被设置了,所以第一个if的
        // 检查将会通过(prev!=null),此时这个线程被意外唤醒,然后在await方法中的
        // 循环中继续执行,调用此方法,此时就会符合注释描述的这种情况。
        return findNodeFromTail(node);
    }
    
    private boolean findNodeFromTail(Node node) {
        // We check for node first, since it's likely to be at or near tail.
        // tail is known to be non-null, so we could re-order to "save"
        // one null check, but we leave it this way to help the VM.
        for (Node p = tail;;) {
            if (p == node)
                return true;
            if (p == null)
                return false;
            p = p.prev;
        }
    }
    
    

    signalAll()方法则对等待队列中的每个节点都唤醒,将它们全部移到同步队列中。

    > line: 1988
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    
    > line: 1919
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
    
    

    关于AQS的分析到此结束。除了AQS以外,java.util.concurrent.lock包中还有一个AbstractQueuedLongSynchronizer类,此类的实现与AQS几乎完全一致,只有同步状态使用long表示而不是int。

    > line: 99
    /**
     * The synchronization state.
     */
    private volatile long state;
    

    相关文章

      网友评论

          本文标题:2. AbstractQueuedSynchronizer(二)

          本文链接:https://www.haomeiwen.com/subject/ljbdmqtx.html