美文网首页
深入解析AbstractQueuedSynchronizer源码

深入解析AbstractQueuedSynchronizer源码

作者: hcy0411 | 来源:发表于2018-08-06 17:05 被阅读0次

    简介和对比Object监视器方法对比

    任何一组对象都用一组监视器方法,主要就是wait和notify方法,这些方法与synchronized关键字一起使用,实现等待/通知模式。juc也实现了类似Object的监视器方法,就是Condition,与lock可以配合使用实现等待/通知模式。功能还是有些差异,下面是Object监视器方法与Condition监视器方法功能对比。

    对比项 Object Condition
    实现条件 获取Synchronized锁 获取Lock锁
    等待队列个数 只有一个,即一个Synchronized锁下,只有一个等待队列 多个,Lock可以new多个Condition
    线程释放锁进入等待状态,是否支持中断 不支持 支持
    线程释放锁进入等待状态,是否支持等待到具体某个时间 不支持 支持

    以上是Object监视器方法与Condition监视器方法的主要区别,可以看出Condition监视器方法功能更多一点。

    简单实现

    分别用Object监视器方法和Condition监视器方法实现一个简单的阻塞队列

    Object监视器方法

        private Object a = new Object();
    
        private Object b = new Object();
    
        private int count = 5;
    
        private List<String> list = new ArrayList<>(5);
    
        public void add(String str) {
    
            synchronized (a) {
                if (list.size() == count) {
                    try {
                        a.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            synchronized (this) {
                list.add(str);
            }
            synchronized (b) {
                try {
                    b.notifyAll();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public String get() {
    
            synchronized (b) {
                if (list.size() == 0) {
                    try {
                        b.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            String str = "";
            synchronized (this) {
                str = list.get(0);
                list.remove(0);
            }
            synchronized (a) {
                try {
                    a.notifyAll();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return str;
        }
    

    Condition监视器方法

        private ReentrantLock lock = new ReentrantLock();
    
        private Condition a = lock.newCondition();
    
        private Condition b = lock.newCondition();
    
        private List<String> list = new ArrayList<>(5);
    
        private int count = 5;
    
        public void add(String str) {
    
            try {
                lock.lock();
                if (list.size() == count) {
                    a.await();
                }
                list.add(str);
                b.signalAll();
            } catch (Exception e) {
    
            } finally {
                lock.unlock();
            }
        }
    
        public String get() {
    
            try {
                lock.lock();
                if (list.size() == 0) {
                    b.await();
                }
                String str = list.get(0);
                list.remove(0);
                a.signalAll();
                return str;
            } catch (Exception e) {
    
            } finally {
                lock.unlock();
            }
            return null;
        }
    

    从以上代码中可以看出Object监视器方法只有一个等待队列,因此需要定义两组Object监视器方法来实现生产者-消费者pv操作,而Condition支持多个等待队列,因此不用定义多组,直接用一个锁就行。

    源码分析

    本文主要分析aqs中的实现,即AbstractQueuedSynchronizer.ConditionObject
    具体的一些方法即描述如下:

    方法 描述
    signal 唤醒一个等待在Condition上的线程
    signalAll 唤醒所有等待在Condition上的线程
    awaitUninterruptibly 线程进入等待队列等待被唤醒,不响应中断
    await() throws InterruptedException 线程进入等待队列等待被唤醒,响应中断,抛出中断异常
    awaitNanos(long nanosTimeout) throws InterruptedException 线程进入等待队列等待被唤醒,最多等待nanosTimeout纳秒,响应中断,抛出中断异常
    awaitUntil(Date deadline) throws InterruptedException 线程进入等待队列等待被唤醒,最多等待到该具体时间,响应中断,抛出中断异常

    简单介绍

    深入解析方法实现前先简单介绍实现原理AbstractQueuedSynchronizer共有两个等待队列,Sync Queue和Condition Queue,Lock的加锁和wait,signal之间就是两个等待队列转换的过程,同Object监视器方法一样,Condition运行wait和signal方法时同样要先获取锁才能执行。
    wait方法:把当前获的锁线程封装成Node节点放到Condition Queue中,然后释放锁资源。
    signal方法:把Condition Queue首节点拿到Sync Queue中,完成之后释放锁资源,然后唤醒Sync Queue中的Node节点。

    signal 和 signalAll

            public final void signal() {
                 //健康检查,由Lock自己实现,比如ReentrantLock,就是检查当前线程是否是拥有锁的线程
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                //唤醒等待队列中的第一个节点
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    

    doSignal 和doSignalAll

            private void doSignal(Node first) {
                do {
                    // 这里的操作主要是把first节点从Condition Queue中摘出来,如果队列空了,设置lastWaiter为null
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) && // 唤醒该节点,即从Condition Queue转移到Sync Queue中
                         (first = firstWaiter) != null);
            }
    
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                // 循环唤醒所有节点
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    

    transferForSignal

        final boolean transferForSignal(Node node) {
            /*
             * cas改变CONDITION到初始状态,如果没成功,唤醒失败
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            // enq方法是将node节点插入到Sync Queue中
            Node p = enq(node);
            int ws = p.waitStatus;
            // 如果节点被取消即(ws > 0 )或者设置需要被唤醒状态SIGNAL失败,就唤醒线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    以上是Condition的signal相关方法,唤醒操作,总结一点就是把Condition Queue中节点拿到Sync Queue中,设置成被需要被唤醒状态,等待被唤醒。

    中断标志

    简单介绍下中断标志,因为Condition的wait支持中断,所以中断很重要

            /**
            * 这两种的区别是是否是在signal后中断的,THROW_IE是唤醒前中断,需要抛出中断异常,REINTERRUPT是被唤醒后中断
            **/
            /** Mode meaning to reinterrupt on exit from wait 意思是wait退出前自我中断,不需要抛出中断异常 */
            private static final int REINTERRUPT =  1;
            /** Mode meaning to throw InterruptedException on exit from wait  退出前需要抛出中断异常*/
            private static final int THROW_IE    = -1;
    

    awaitUninterruptibly

    该方法不支持中断异常,实现比较简单,可以参照wait方法理解

            public final void awaitUninterruptibly() {
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean interrupted = false;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if (Thread.interrupted())
                        interrupted = true;
                }
                if (acquireQueued(node, savedState) || interrupted)
                    selfInterrupt();
            }
    

    wait()throws InterruptedException

    wait方法支持中断

            public final void await() throws InterruptedException {
            // 判断下中断状态
            if (Thread.interrupted())
                throw new InterruptedException();
            //封装node节点到Condition Queue中
            Node node = addConditionWaiter();
            // 释放锁资源,唤醒其他等待锁资源线程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 等待是否在Sync Queue中,即是否被其他线程signal唤醒,如果没有一直进行阻塞,指导被唤醒进入Sync Queue队列中
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 判断下中断类型,如果有中断且已经被signal唤醒,即节点在Sync Queue中,是THROW_IE,否则是REINTERRUPT,或者是0即没有中断,代码不深入理解,可以自己去看
                // 这里被中断唤醒,而不是signal之后被唤醒,会在Condition Queue中和Sync Queue中都存在
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // acquireQueued是循环的独占方式获取锁资源,即已经被唤醒放到Sync Queue中,需要独占的方式再获取锁资源,返回值是是否被中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 这里判断是否是被中断唤醒的,如果是中断唤醒的,需要把Condition Queue中已被取消的节点清理掉,防止垃圾存在
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                // 最后就是如果有中断,处理下中断,如果是THROW_IE中断模型,抛出中断异常
                reportInterruptAfterWait(interruptMode);
    

    checkInterruptWhileWaiting 和 transferAfterCancelledWait

           private int checkInterruptWhileWaiting(Node node) {
                // 这里就是判断下线程是否被中断,如果中断,判断中断类型
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
     // 这个函数的主要是判断节点被取消是否在signal唤醒之前,
    final boolean transferAfterCancelledWait(Node node) {
           // 如果节点的状态是CONDITION,说明还没被signal唤醒,依然在Condition Queue中,然后返回插入到Sync Queue中返回true,这里说明了在signal唤醒之前被中断,两个等待队列都会存在节点
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
            // 否则说明已经被signal唤醒,返回false
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    

    awaitNanos(long nanosTimeout)throws InterruptedException

    该方法和wait方法基本差不多,就多了个超时等待

            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 这里如果超时直接取消,然后退出等待
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime(); //返回值表明是否是超时还是被唤醒
    

    剩下的两个与时间相关的方法 public final boolean awaitUntil(Date deadline)和public final boolean await(long time, TimeUnit unit)方法和 awaitNanos(long nanosTimeout)方法基本一样,不做深入理解。

    总结

    以上是全部的Condition理解,一句话,Condition就是锁状态的一种中间状态转换队列,实现了等待/通知模式

    相关文章

      网友评论

          本文标题:深入解析AbstractQueuedSynchronizer源码

          本文链接:https://www.haomeiwen.com/subject/xupbvftx.html