美文网首页Java
Condition源码分析

Condition源码分析

作者: 笔记本一号 | 来源:发表于2020-10-15 00:08 被阅读0次
    Java对象都有一组监视器方法:wait,notify,notifyAll,而synchronized本身就是利用虚拟机提供的对象监视器(ObjectMonitor对象)实现同步,这些方法与synchronized配合实现了等待/通知模式,Condition提供了类似synchronized监视器的方法,利用AQS条件队列与Lock配合实现了等待/通知模式,下图是摘自《Java并发编程的艺术》 Java并发编程的艺术

    代码演示:

    public class ConditionTest implements Runnable {
        private final static Lock lock = new ReentrantLock();
        private final static Condition CONDITION = lock.newCondition();
        public void test1() {
            lock.lock();
            try {
                if (Thread.currentThread().getName().equals("t1")) {
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + "拿到锁了");
                    System.out.println(Thread.currentThread().getName() + "释放锁,等待通知");
                    CONDITION.await();
                    System.out.println(Thread.currentThread().getName() + "重新获得锁");
                    System.out.println(Thread.currentThread().getName() + "执行结束");
                } else if (Thread.currentThread().getName().equals("t2")){
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + "拿到锁了");
                    System.out.println(Thread.currentThread().getName() + "发送通知");
                    CONDITION.signal();
                    System.out.println(Thread.currentThread().getName() + "执行结束,释放锁");
                }
            } catch (Exception e) {
                System.out.println(e);
            } finally {
                lock.unlock();
            }
        }
        public static void main(String[] args) throws InterruptedException {
            ConditionTest test = new ConditionTest();
            Thread t1 = new Thread(test, "t1");
            Thread t2 = new Thread(test, "t2");
            t1.start();
            Thread.sleep(300);
            t2.start();
    
        }
        @Override
        public void run() {
            test1();
        }
    }
    
    image.png

    Condition作用原理

    Condition内部是AQS的内部类利用条件队列实现阻塞和通知线程的效果。当一个线程在调用了await方法以后会被阻塞,调用signal方法唤醒。这种方式为线程提供了更加简单的等待/通知模式。一个Condition的实例必须与一个Lock绑定,因为Condition是作为AQS的内部类实现的,而Lock内部其实就是使用AQS实现的同步功能,因此Condition作用需要与Lock或者实现了AQS的类配合使用。

    • await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
    • await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    • awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
    • awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态,该方法对中断不敏感。
    • awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
    • signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
    • signalAll() :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

    等待:
    Condition是AQS的内部类。每个Condition对象都包含一个队列(等待队列)。等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、以当前线程构造成节点并将节点从尾部加入等待队。如果不是通过 其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断interrupt(),则会抛出InterruptedException异常信息。

    image.png

    通知:
    调用Condition的signal()方法,将会唤醒在等待队列中首节点,在唤醒节点前,会将节点移到同步队列中,在调用signal()方法之前必须先判断是否获取到了锁。接着获取等待队列的首节点,将其移动到同步队列并且利用LockSupport唤醒节点中的线程

    通知

    Condition源码分析

    await

    ConditionObject 是AQS的内部类,实现了Condition接口
    await()方法对中断敏感,线程中断标志位为true时调用await()就会抛出异常

    //这个ConditionObject 是AQS的内部类
     public class ConditionObject implements Condition, java.io.Serializable {
     public final void await() throws InterruptedException {
    //这里表明await()方法对中断敏感,线程中断为true时调用await()就会抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
    //将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列。
                Node node = addConditionWaiter();
    //释放node节点线程的锁
                int savedState = fullyRelease(node);
                int interruptMode = 0;
    //判断节点是否在同步队列中,在则使用LockSupport.park将其挂起
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列

       private Node addConditionWaiter() {
                Node t = lastWaiter;
             // 遍历队列,将状态不为CONDITION的节点剔除出队列
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
    //将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //尾节点为空则将节点表明队列为空,将新节点设置为头节点
                if (t == null)
                    firstWaiter = node;
                else
    //尾节点不为空则将节点表明队列不为空,将新节点设置为尾节点的后续节点
                    t.nextWaiter = node;
    //将新节点设置为尾节点
                lastWaiter = node;
                return node;
            }
    

    遍历队列,将状态不为CONDITION的节点剔除出队列

    private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                //遍历队列
                while (t != null) {
                    //遍历队列
                    Node next = t.nextWaiter;
                    //将状态不为CONDITION的节点清除
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    

    使用release确保线程释放

      final int fullyRelease(Node node) {
            boolean failed = true;
            try {
    //获取到锁代表getState()大于0
                int savedState = getState();
    //这里会确保线程释放
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    release利用tryRelease先进行释放锁,tryRelease是ReentrantLock继承AQS实现的方法,可以确保线程是获取到锁的,并且进行释放锁,unparkSuccessor主要是利用LockSupport.unpark(s.thread)唤醒线程,这里我之前在AQS的源码分析讲过,不懂的可以去看看

      public final boolean release(int arg) {
            //释放锁,这个方法是ReentrantLock继承AQS实现的方法
            if (tryRelease(arg)) {
                Node h = head;
    //如果节点状态不是CANCELLED,也就是线程没有被取消,也就是不为0的,就进行唤醒
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    这个方法主要是确保了当前线程持有的锁,不是则抛出异常,确保线程一定是获取到锁的线程,并且进行相应的释放锁

       protected final boolean tryRelease(int releases) {
    //将线程的state计时器减-1,state为0代表没有线程持有锁,大于0则代表有线程持有锁了
                int c = getState() - releases;
    //判断是否是当前线程持有的锁,不是则抛出异常,确保线程一定是获取到锁的线程
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
    //将记录持有线程的变量置为空
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    isOnSyncQueue主要用于判断node是否在同步队列中

    final boolean isOnSyncQueue(Node node) {
       //判断节点的状态,如果状态是CONDITION,说明节点肯定不在同步队列中,同时哪怕同步队列是刚刚初始化的,也会有一个冗余的头节点存在,
    //所以节点的前驱节点如果为null,那么节点也肯定不在同步队列中,返回fasle
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
       //节点的后继节点不为null,说明节点肯定在队列中,返回true,
    //这里很重要的一点要明白,prev和next都是针对同步队列的节点
        if (node.next != null)
            return true;
        //调用findNodeFromTail,查找node是否在同步队列中
        return findNodeFromTail(node);
    }
    
    
    private boolean findNodeFromTail(Node node) {
        //取得同步队列的队尾元素
        Node t = tail;
        //无限循环,从队尾元素一直往前找,找到相等的节点就说明节点在队列中,
    //node为null了,说明前面已经没有节点可以找了,那就返回false
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
    

    我们看到Condition挂起线程的手段是和AQS一样的,使用的依旧是 LockSupport的park方法,那么我们可以猜到signal使用的肯定是LockSupport的unpark方法

    signal

    signal的作用就是要将await()中Condition队列中第一个Node唤醒(signalAll唤醒全部Node)唤醒

    isHeldExclusively是需要子类继承的,在lock中判断当前线程是否是获得锁的线程,证明了signal调用是需要在获取锁的情况下,这里是先会判断整个condition队列是否为空,不为空则获取Condition队列中第一个Node进行唤醒

      public final void signal() {
    //isHeldExclusively是需要子类继承的,在lock中判断当前线程是否是获得锁的线程,是则返回true,如何当前线程不是获取锁的线程则抛出异常
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
    //获取Condition队列中第一个Node
                Node first = firstWaiter;
    //判断Condition队列是否为空
                if (first != null)
                    doSignal(first);
            }
    

    这是ReentrantLock中的实现方法,判断当前线程是否是获得锁的线程,是则返回true

       protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    

    这个方法主要逻辑在transferForSignal

       private void doSignal(Node first) {
                do {
    //头节点为空则,队列为空
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
    //将头结点从等待队列中移除
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    

    transferForSignal把头节点扔到同步队列中,然后使用LockSupport.unpark唤醒节点线程,enq(node)在AQS源码分析中讲过过来了,这里省略

       final boolean transferForSignal(Node node) {  
    //通过CAS将状态为CONDITION节点的状态修改为0
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    //将该节点移入到同步队列中去,p是node的前缀节点
            Node p = enq(node);
            int ws = p.waitStatus;
    //以下情况进行唤醒节点
    //1、node的前缀节点状态为0或者节点状态不为零
    //2、node的前缀节点状态修改为SIGNAL状态失败
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    相关文章

      网友评论

        本文标题:Condition源码分析

        本文链接:https://www.haomeiwen.com/subject/xnnwpktx.html