美文网首页
Java 并发编程系列(二) 锁之 LockSupport 工具

Java 并发编程系列(二) 锁之 LockSupport 工具

作者: Gxgeek | 来源:发表于2017-12-19 15:34 被阅读0次
    LockSupport工具类 Condition 与 Object对比(来自 Java并发编程的艺术一书)

    Condition提供了一系列的方法

    Condition 方法
    • await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
    • await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    • awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
    • awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
    • awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
    • signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
    • signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

    每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        
        //头节点
        private transient Node firstWaiter;
        //尾节点
        private transient Node lastWaiter;
    
        public ConditionObject() {
        }
        
        /** 省略方法 **/
    }
    
    队列基本结构

    await()

        public final void await() throws InterruptedException {
            // 当前线程中断
            if (Thread.interrupted())
                throw new InterruptedException();
            //当前线程加入等待队列
            Node node = addConditionWaiter();
            
            //释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            
            /**
             * 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
             * 直到检测到此节点在同步队列上
             */
            while (!isOnSyncQueue(node)) {
                //线程挂起
                LockSupport.park(this);
                //如果已经中断了,则退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
    //被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
                
            //清理下条件队列中的不是在等待条件的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    
    
    
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
        //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            
            //将该节点加入到条件队列中最后一个位置
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
        
        
        
        
        
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                //节点状态--其实就是持有锁的数量
                int savedState = getState();
                //释放锁
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
        
    //isOnSyncQueue(Node node):
    //如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true
    
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            return findNodeFromTail(node);
        }
    

    signal()

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //头节点,唤醒条件队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        
    
        private void doSignal(Node first) {
            do {
                 //修改头结点,完成旧头结点的移出工作
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&//将老的头结点,加入到AQS的等待队列中
                     (first = firstWaiter) != null);
        }
    
    
    
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
             //将该节点从状态CONDITION改变为初始状态0,
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
             //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
            Node p = enq(node);
            int ws = p.waitStatus;
           //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    整个通知的流程如下:
    • 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
    • 如果线程已经获取了锁,则将唤醒条件队列的首节点
    • 唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中
    • 最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。

    总结

    一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中, 
    然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了, 
    如果是则尝试获取锁,否则一直挂起。 
    当线程调用signal()方法后,程序首先检查当前线程是否获取了锁, 
    然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。 
    被唤醒的线程,将从await()方法中的while循环中退出来, 
    然后调用acquireQueued()方法竞争同步状态。
    

    相关文章

      网友评论

          本文标题:Java 并发编程系列(二) 锁之 LockSupport 工具

          本文链接:https://www.haomeiwen.com/subject/votiwxtx.html