美文网首页
Java并发之AQS拓展组件(4)

Java并发之AQS拓展组件(4)

作者: 小马蛋 | 来源:发表于2019-11-24 16:46 被阅读0次

    1、回顾

    上一章节,我们讲解了AQS这个抽象类,知道了AQS是一个用于构建锁和同步器的框架,许多锁和同步器都可以通过AQS很容易且高效地构造出来。

    接下来,我们围绕着,基于AQS实现的一些拓展组件,进行本期讲解。

    2、AQS拓展组件

    2.1 ReentrantLock

    ReentrantLock只支持以独占的方式获取操作,因此它实现了tryAcquire(int)tryRelease(int)isHeldExclusively()
    ReentrantLock将同步状态state,用于保存锁获取操作的次数,并且还维护一个owner变量来保存持有锁的线程。只有在当前线程刚刚获取到锁,或者释放锁的时候,才会修改这个变量。

    其内部类NonfairSyncFairSync均继承于抽象类Sync

    Sync类代码:

        /**
         * 抽象静态内部类
         * ReentrantLock的同步控制基础。下面的两个子类是公平和非公平的版本。使用AQS状态表示锁上的保持数。
         * 抽象队列同步器AbstractQueuedSynchronizer (以下都简称AQS),是用来构建锁或者其他同步组件的基础框架,
         * 它使用了一个int成员变量来表示同步状态,通过内置的FIFO(first-in-first-out)同步队列来控制获取共享资源的线程。
         */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            /**
             * 尝试获取锁
             * 交给下面的两个子类去实现
             * 因为ReentrantLock有公平锁和非公平锁两个版本,所以在获取锁逻辑会有差别
             */
            abstract void lock();
    
            /**
             * 以非公平的方式尝试获取锁,直接返回是否持有锁的标识
             */
            final boolean nonfairTryAcquire(int acquires) {
                // 当前线程
                final Thread current = Thread.currentThread();
                // 获取同步状态
                int c = getState();
                // c == 0 表示没有线程持有锁
                if (c == 0) {
                    // CAS 抢占锁,如果成功返回false 否则跳出分支语句返回false
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 重入逻辑判断,如果当前线程是持有锁的线程,可以继续持有,state+=acquires
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    // fast-fail检查
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            /**
             * 尝试释放锁
             * AQS release方法调用子类的tryRelease
             */
            protected final boolean tryRelease(int releases) {
                // 当前状态原子量 减 释放值
                int c = getState() - releases;
                // 如果线程不是锁的持有者,抛出异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 没持有多层锁则将持有锁的线程设置为null 表示没有线程持有该锁
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            // 当前线程是否为锁的持有者
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            // 获取通信对象
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            // 获取持有锁的线程
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    
            // 获取锁state
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    
            // 锁是否被持有
            final boolean isLocked() {
                return getState() != 0;
            }
    
            // 忽略了一些代码...
        }
    

    我们来看一下,非公平锁的实现方式:

        /**
         * 非公平锁的同步对象
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * 获取锁,尝试立即获取锁
             * 比较并设置状态
             *  + 如果成功,设置线程独占
             *  + 如果失败,判断是否和持有锁的线程是同一个线程,是,则持有锁state+1,进入同步代码块,否则进入等待队列
             */
            final void lock() {
                // CAS 如果返回true说明获取成功
                if (compareAndSetState(0, 1))
                    // 设置拥有独占访问权的线程对象
                    setExclusiveOwnerThread(Thread.currentThread());
                /*
                 * 否则,调用AQS的尝试获取锁方法。
                 * AQS尝试获取锁方法是子类必须要重写的方法,所以实际调用的是NonfairSync的tryAcquire方法
                 * 可重入锁逻辑:判断持有锁的线程和当前线程是否同一个,true则继续持有,false进入等待队列
                 */
                else
                    acquire(1);
            }
    
            /**
             * 第一次尝试获取锁不成功后会调用此方法,再次尝试获取一下锁,主要目的在于如果持有锁的线程和争用锁的线程是一个线程,则获取锁成功
             */
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    加锁操作调用NonfairSynclock(),具体逻辑我已经在代码写明白了,一定要看上述代码!

    释放锁操作调用AQS的release(int),在release(int)方法中调用子类,也就是SynctryRelease(int)
    在释放操作时,检查了owner变量是否为释放锁的线程,确保当前线程在执行unlock()操作之前已经获取了锁,否则抛出异常。

    我们来看一下,公平锁的实现方式:

        /**
         * 公平锁对象
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            /*
             * 公平可重入锁,直接调用AQS的acquire方法。
             */
            final void lock() {
                acquire(1);
            }
    
            /*
             * AQS acquire方法调用
             * 公平锁为了保证先到先得原则,会判断是否存在比当前线程等待时间更长的线程,如果是,此线程需要进入等待队列,而不是像非公平锁那样直接进行CAS操作。
             */
            protected final boolean tryAcquire(int acquires) {
                // 获取当前线程
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    /*
                     * 是否有比当前线程等待时间更长的线程
                     *  + true,进入同步器等待队列
                     *  + false,直接设置state同步状态
                     */
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 重入逻辑判断
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    非公平锁的加锁操作,为了保证获取锁的公平性,选择了直接调用AQSacquire(int)方法,而不是像非公平锁那样,直接进行CAS操作。
    之所以这样做,是为了公平锁在进行加锁前,要探查是否有比当前线程等待时间更长的线程,根本原因是为了保证公平性。

    释放锁操作和非公平锁代码逻辑一样。

    2.2 CountDownLatch

    我个人理解为一个或多个线程等待其它线程执行完毕再执行,我一般用作线程间的有条件通信。在稍微了解了一些AQS知识点以后,再来阅读CountDownLatch的源码,比较容易阅读和理解。而且其源码并不复杂,也不多,但是设计和实现的真的是相当精巧。

    CountDownLatch类里,同样也维护了一个静态内部类Syncstate代表计数。但是要注意,这个Sync不是ReentrantLock里的Sync抽象静态内部类,这里的是静态内部类,不要记混了。

    • await()用于决定当前线程是阻塞还是非阻塞
    • countDown()用于原子性的对state计数减1,如果计数为0,会唤醒AQS内FIFO队列第一个等待线程,然后该线程再唤醒后继线程。以此类推,直到所有线程都唤醒完毕。这代码写的真有意思,二营长的意大利炮都被我看丢了。

    Sync类代码:

        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         * 计数锁的同步控制器。使用AQS状态表示计数。
         * 继承AQS
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            // 有参构造函数,指定计数的次数
            Sync(int count) {
                // 此时state代表了次数
                setState(count);
            }
    
            // 返回计数
            int getCount() {
                return getState();
            }
    
            /*
             * 尝试获取操作
             * 返回负数,表示当前线程需要等待,计数还未归0,需要等待其它工作完成
             * 返回正数,表示需要等待的其它工作已经完成,无需等待,线程向下一步进发
             */
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            /*
             * 尝试释放操作,也就是计数减1
             * 参数releases实际上在这里没有用到
             */
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                /*
                 * 无限循环,直到释放掉设置state设置成功
                 */
                for (;;) {
                    // 获取当前计数
                    int c = getState();
                    // 如果等于0,终止循环,返回false
                    if (c == 0)
                        return false;
                    // 否则,在当前的state计数上-1,得到新的state值
                    int nextc = c-1;
                    // 设置新的state值,如果成功则返回nextc == 0,否则,继续循环设置,直到设置成功
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    await()方法代码
    CountDownLatch:

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

    AQS:

        public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
        /**
         * Acquires in shared interruptible mode.
         * 以共享可中断的模式获取
         */
        private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
            // 以共享模式包装当前线程为节点,并加入到CLH队列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                // 无限循环,实际上并不是一直循环,线程会阻塞,直到其它线程来唤醒
                for (;;) {
                    // 当前节点的前驱节点,在addwaiter()方法里有设置,一般都是指向tail,如果是初始化,那么前驱节点是head(因为初始化时,head和tail是一个对象)如有不懂,立即看源码
                    final Node p = node.predecessor();
                    // 前驱节点是否为head
                    if (p == head) {
                        // 尝试获取共享操作,如果state=0 则肯定返回1
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 设置头结点和根据调用tryAcquireShared返回的状态以及节点本身的等待状态来判断是否要需要唤醒后继线程。
                            setHeadAndPropagate(node, r);
                            // 准备回收无用节点
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    // 获取操作失败后阻塞当前线程,所以线程就被阻塞在这里了
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    

    countDown()方法代码
    CountDownLatch:

        public void countDown() {
            sync.releaseShared(1);
        }
    

    AQS:

        public final boolean releaseShared(int arg) {
            /*
             * 调用子类覆写父类的方法tryReleaseShared(int),也就是上面Sync的tryReleaseShared(int releases)方法
             * 如果state最终被释放为0,则唤醒等待线程
             */
            if (tryReleaseShared(arg)) {
                // 唤醒等待线程
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        /**
         * 共享模式的释放操作——通知后续操作并确保传播。
         * (注意:对于独占模式,如果需要信号,release相当于调用head的unparkSuccessor。)
         */
        private void doReleaseShared() {
            for (; ; ) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    // head节点存储的waitStatus其实指的是后继节点的等待状态
                    if (ws == Node.SIGNAL) {
                        // 将头结点的waitStatus设置为0,也就是初始状态,如果设置失败,则无限循环设置
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;
                        /*
                         * 唤醒head节点的后继节点,其方法内部唤醒的线程实际是后继节点的线程,
                         * 真实的后继线程被阻塞在了doAcquireSharedInterruptibly()方法内部,被唤醒后恢复记忆,继续执行,
                         * 然后在doAcquireSharedInterruptibly方法里调用setHeadAndPropagate()方法,里面可以继续唤醒后继线程,以此类推...
                         */
                        unparkSuccessor(h);
                    } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;
                }
                /*
                 * 如果头节点发生改变,则继续循环
                 */
                if (h == head)
                    break;
            }
        }
    

    CountDownLatch静态内部类主要覆写了tryAcquireShared(int)tryReleaseShared(int),就达到了一个或多个线程等待其它线程执行完毕再执行的目的,很神奇,慨叹设计者的天工巧做。

    2.3 CyclicBarrier

    用来控制一组线程只有全部到达一个指定点后才能一起执行。与CountDownLatch类似,通过维护AQS内的state(作为计数)来实现的,线程执行await()方法后,计数器-1,直到计数器为0,所有调用await()方法的线程才能继续执行。

    CyclicBarrierCountDownLatch最大的不同除了上述的概念之外,还有就是,CyclicBarrier可以调用reset()方法后,继续使用,所以称它为循环屏障

    CyclicBarrier没有显示的表现出与AQS的关系,但是,其内部使用了ReentrantLockCondition,借力二者实现循环屏障的需求,也算的上是组件上的组件了,所以我们暂且也把它称为AQS的拓展组件。

    CyclicBarrier有两种初始化方法:

    • CyclicBarrier(int parties, Runnable barrierAction)
      • parties 参与此次循环屏障的线程数量
      • barrierAction 当参与的线程到达指定点后需要执行的线程
    • CyclicBarrier(int parties)
      • parties 参与此次循环屏障的线程数量

    CyclicBarrierawait()为重载方法,共两种:

    • await() 采用默认参数
    • await(long timeout, TimeUnit unit)
      • 设置等待时间,如果超时则抛出异常,并将唤醒所有线程,CyclicBarriercount参数恢复为parties,Generation中断标记设置为true,如果再次调用await()会抛出异常,要想正常工作应该在调用await()之前调用reset()

    由于源码不多也不复杂,所以我们贴上源码,一起happy~

    public class CyclicBarrier {
        /**
         * 这个静态内部类的主要作用就是描述了Cyclicbarrier的更新换代
         * 一批线程属于一代,其中的broken标识当前CyclicBarrier是否处于中断状态
         */
        private static class Generation {
            // 中断标识
            boolean broken = false;
        }
    
        /*
         * 互斥锁
         */
        private final ReentrantLock lock = new ReentrantLock();
        /*
         * 用以协调线程协作的工具Condition    
         */
        private final Condition trip = lock.newCondition();
        /*
         * 参与此次循环屏障的线程数,用户初始化时传递的,之所以单独留存,是因为循环屏障可以循环使用
         */
        private final int parties;
        /*
         * 参与循环屏障的线程,全部到达指定点后,需要执行的线程 暂且叫它为屏障任务
         */
        private final Runnable barrierCommand;
        /**
         * 当前循环屏障的代属
         */
        private Generation generation = new Generation();
    
        /**
         * 初始化时将parties赋值到此变量
         */
        private int count;
    
        /**
         * 这一代线程已经全部到达指定点,生成下一代代属
         */
        private void nextGeneration() {
            // 唤醒所有这代的阻塞线程
            trip.signalAll();
            // 计数回归
            count = parties;
            // 生成下一代代属
            generation = new Generation();
        }
    
        /**
         * 中断循环屏障
         *  - 将同代的中断属性更新为true
         *  - 计数器更新为初始值
         *  - 唤醒所有阻塞线程
         */
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
        /**
         * 核心方法
         */
        private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
            // 直接this.lock.lock()不行吗?老爷子这是有代码洁癖吗?
            final ReentrantLock lock = this.lock;
            // 互斥锁开启
            lock.lock();
            try {
                final Generation g = generation;
                
                // 检查同步屏障是否中断状态,也就是上面说的超时引发异常后,如果没有调用reset()方法
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 判断线程中断标识
                if (Thread.interrupted()) {
                    // 如果这个线程中断标识为true,则中断循环屏障并抛出中断异常
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                // 计数-1
                int index = --count;
                // 如果计数为0 说明此线程是最后一个到达线程
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        // 屏障任务不为空,执行屏障任务
                        if (command != null)
                            command.run();
                        ranAction = true;
                        // 生成下一代代属,计数回归,唤醒所有阻塞线程
                        nextGeneration();
                        return 0;
                    } finally {
                        // 如果屏障任务执行发生异常,则中断当前循环屏障
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // 如果计数不为0 说明还有其它线程没有到达,进入无限循环代码块
                for (;;) {
                    try {
                        // 是否设置了等待超时
                        if (!timed)
                            // 阻塞当前线程
                            trip.await();
                        // 设置了等待超时且超时时间大于0L
                        else if (nanos > 0L)
                            // 等待nanos
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        /*
                         * 如果在阻塞过程中线程中断
                         * 当前的代属是否等于CyclicBarrier的generation & 并且没有中断
                         */
                        if (g == generation && !g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // 如果是这样,说明当前的线程不是这代的,不会影响当前代属正常运行,中断当前线程即可
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    // 代属是否被中断,true 则抛出中断异常
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    /*
                     * g != generation表示正常换代,返回当前线程所在屏障的下标即可
                     */
                    if (g != generation)
                        return index;
    
                    /*
                     * 判断超时,如果超时则中断循环屏障,抛出超时异常
                     */
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 有屏障任务的构造函数
         */
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        /**
         * 构造函数
         */
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
        /**
         * 返回当前还有多少未到达的线程数量
         */
        public int getParties() {
            return parties;
        }
    
        /**
         * 等待其它线程到达,调用内部方法dowait()
         */
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        /**
         * 等待其它线程到达,调用内部方法dowait()
         */
        public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
        /**
         * 返回循环屏障的代属是否中断标识
         */
        public boolean isBroken() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return generation.broken;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 重置循环屏障参数
         *  - 中断循环屏障
         *  - 生成下一代代属
         */
        public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // break the current generation
                nextGeneration(); // start a new generation
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 返回已到达的线程数量
         */
        public int getNumberWaiting() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return parties - count;
            } finally {
                lock.unlock();
            }
        }
    }
    

    2.4 Semaphore

    信号量,用来控制同时访问共享资源的操作数量,或同时执行特定操作的数量,可以理解为简单点的限流器。

    其内部类似ReentrantLock的实现方式:

    • 同样维护了Sync抽象静态内部类,Sync继承于AQS
    • 维护了非公平同步器,继承于Sync抽象静态内部类
    • 维护了公平同步器,继承于Sync抽象静态内部类

    大致思路:

    非公平版本同步器

    初始化指定限流流量数,表示对某一资源或者操作的最大数量。

    • 当线程调用信号量acquire()方法时,会对AQS内部的state变量做操作,如果剩余流量数大于0,放行当前线程,如果剩余流量数等于0,则将当前线程加入等待队列。
    • 当线程调用信号量release()方法时,会对AQS内部的state变量做操作,并唤醒等待队列里的后继线程,后继线程会调用tryAcquireShared(int),尝试获取流量许可。
    公平版本同步器

    初始化指定限流流量数,标识对某一资源或者操作的最大数量。

    • 当线程调用acquire()方法时,会首先判断AQS等待队列是否还有其它等待线程节点,如果有,则将当前线程放入等待队列。如果没有,会对AQS内部的state变量做操作,如果剩余流量数大于0,放行当前线程,如果剩余流量数等于0,则将当前线程加入等待队列。
    • 释放同非公平版本同步器一样。

    由于源码不复杂且不多,故贴上所有代码:

        /**
         * 信号量类,控制某一特定资源或者操作,执行的数量
         * 可以理解为简单的限流
         */
        public class Semaphore implements java.io.Serializable {
            private static final long serialVersionUID = -3222578661600680210L;
            /*
             * 内部同步器类
             */
            private final Sync sync;
        
            /**
             * 抽象内部静态类,继承AQS
             */
            abstract static class Sync extends AbstractQueuedSynchronizer {
                private static final long serialVersionUID = 1192457210091910933L;
        
                // 有参构造函数,permits定义最大流量数
                Sync(int permits) {
                    // 将AQS的同步状态作为流量数
                    setState(permits);
                }
        
                // 获取限流剩余流量
                final int getPermits() {
                    return getState();
                }
        
                // 以非公平的方式尝试获取共享操作
                final int nonfairTryAcquireShared(int acquires) {
                    // 无限循环
                    for (;;) {
                        // 获取当前的流量数
                        int available = getState();
                        // 获得剩余流量数
                        int remaining = available - acquires;
                        /*
                         * 如果超过流量限制,返回负数,代表失败,进入AQS内部等待队列
                         * cas设置剩余流量数成功,返回正数,代表成功
                         * 没有进入条件分支,代表cas操作失败,无限循环直到进入此条件分支,要么失败,进入队列;要么成功
                         */
                        if (remaining < 0 ||
                            compareAndSetState(available, remaining))
                            return remaining;
                    }
                }
        
                // 尝试释放特定流量
                protected final boolean tryReleaseShared(int releases) {
                    // 无限循环
                    for (;;) {
                        // 获取当前流量数
                        int current = getState();
                        // 最新流量数 = 当前流量数 + 释放流量数
                        int next = current + releases;
                        // 如果最新流量数小于当前流量数 则抛出异常,代表releases参数不能为负数
                        if (next < current) // overflow
                            throw new Error("Maximum permit count exceeded");
                        // cas操作,设置最新流量数,失败则再试
                        if (compareAndSetState(current, next))
                            return true;
                    }
                }
        
                // 削减特定流量数
                final void reducePermits(int reductions) {
                    // 无限循环
                    for (;;) {
                        // 获取当前流量数
                        int current = getState();
                        // 得到最新流量数
                        int next = current - reductions;
                        // 如果最新流量数大于当前流量数,抛出异常,同样意味着,reductions参数不能为负数
                        if (next > current) // underflow
                            throw new Error("Permit count underflow");
                        // cas操作,设置最新流量数
                        if (compareAndSetState(current, next))
                            return;
                    }
                }
                
                // 清空流量数
                final int drainPermits() {
                    // 无限循环
                    for (;;) {
                        // 获取当前流量数
                        int current = getState();
                        /* 
                         * 如果当前流量数为0,直接返回当前流量数
                         */
                        if (current == 0 || compareAndSetState(current, 0))
                            return current;
                    }
                }
            }
        
            /**
             * 非公平版本同步器
             */
            static final class NonfairSync extends Sync {
                private static final long serialVersionUID = -2694183684443567898L;
        
                // 有参构造函数,定义流量数
                NonfairSync(int permits) {
                    super(permits);
                }
                
                // 以共享的方式尝试获取操作
                protected int tryAcquireShared(int acquires) {
                    // 内部调用Sync.nonfairTryAcquireShared(int)方法
                    return nonfairTryAcquireShared(acquires);
                }
            }
        
            /**
             * 公平版本同步器
             */
            static final class FairSync extends Sync {
                private static final long serialVersionUID = 2014338818796000944L;
        
                // 有参构造函数,定义流量数
                FairSync(int permits) {
                    super(permits);
                }
        
                // 以共享且公平的方式尝试获取操作
                protected int tryAcquireShared(int acquires) {
                    // 无限循环
                    for (;;) {
                        /*
                         * 简单理解为是否还有排队线程,也就是是否还有比当前线程排队时间更长的线程
                         * 如果有,则返回-1,代表获取失败,进入等待队列
                         */
                        if (hasQueuedPredecessors())
                            return -1;
                        // 获取当前流量数
                        int available = getState();
                        // 获取剩余流量数
                        int remaining = available - acquires;
                        /*
                         * 剩余流量数小于0 则返回剩余流量数,代表获取失败,进入等待队列
                         * 设置成功,返回剩余流量数,也就是正数,代表获取成功
                         */
                        if (remaining < 0 ||
                            compareAndSetState(available, remaining))
                            return remaining;
                    }
                }
            }
        
            /**
             * 有参构造函数,默认是非公平同步器
             */
            public Semaphore(int permits) {
                sync = new NonfairSync(permits);
            }
        
            /**
             * 有参构造函数,指定内部同步器是公平版本还是非公平版本
             */
            public Semaphore(int permits, boolean fair) {
                sync = fair ? new FairSync(permits) : new NonfairSync(permits);
            }
        
            /**
             * 获取流量操作,也就是如果有剩余流量,则线程继续执行,如果没有剩余流量,则线程等待
             */
            public void acquire() throws InterruptedException {
                sync.acquireSharedInterruptibly(1);
            }
        
            /**
             * 获取流量操作,忽略中断
             */
            public void acquireUninterruptibly() {
                sync.acquireShared(1);
            }
        
            /**
             * 尝试获取流量操作,返回获取结果
             */
            public boolean tryAcquire() {
                return sync.nonfairTryAcquireShared(1) >= 0;
            }
        
            /**
             * 阐释获取流量操作,指定等待时间,返回获取结果
             */
            public boolean tryAcquire(long timeout, TimeUnit unit)
                throws InterruptedException {
                return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
            }
        
            /**
             * 释放
             */
            public void release() {
                sync.releaseShared(1);
            }
        
            /**
             * 获取指定数量的流量
             */
            public void acquire(int permits) throws InterruptedException {
                if (permits < 0) throw new IllegalArgumentException();
                sync.acquireSharedInterruptibly(permits);
            }
        
            /**
             * 获取指定数量的流量并忽略线程中断
             */
            public void acquireUninterruptibly(int permits) {
                if (permits < 0) throw new IllegalArgumentException();
                sync.acquireShared(permits);
            }
        
            /**
             * 尝试获取指定数量的流量,返回获取结果
             */
            public boolean tryAcquire(int permits) {
                if (permits < 0) throw new IllegalArgumentException();
                return sync.nonfairTryAcquireShared(permits) >= 0;
            }
        
            /**
             * 尝试获取指定数量的流量并指定等待时间,返回获取结果
             */
            public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
                throws InterruptedException {
                if (permits < 0) throw new IllegalArgumentException();
                return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
            }
        
            /**
             * 释放指定流量的许可
             */
            public void release(int permits) {
                if (permits < 0) throw new IllegalArgumentException();
                sync.releaseShared(permits);
            }
        
            /**
             * 获取限流剩余流量
             */
            public int availablePermits() {
                return sync.getPermits();
            }
        
            /**
             * 清空流量数
             */
            public int drainPermits() {
                return sync.drainPermits();
            }
        
            /**
             * 削减特定流量数
             */
            protected void reducePermits(int reduction) {
                if (reduction < 0) throw new IllegalArgumentException();
                sync.reducePermits(reduction);
            }
        
            /**
             * 获取是否公平版本同步器
             */
            public boolean isFair() {
                return sync instanceof FairSync;
            }
        
            /**
             * 是否还有排队线程,也就是是否还有比当前线程排队时间更长的线程
             */
            public final boolean hasQueuedThreads() {
                return sync.hasQueuedThreads();
            }
        
            /**
             * 获取等待队列长度
             */
            public final int getQueueLength() {
                return sync.getQueueLength();
            }
        
            /**
             * 获取等待线程集合
             */
            protected Collection<Thread> getQueuedThreads() {
                return sync.getQueuedThreads();
            }
        
            /**
             * toString
             */
            public String toString() {
                return super.toString() + "[Permits = " + sync.getPermits() + "]";
            }
        }
    

    3

    AQS的相关拓展组件ReentrantLockCountDownLatch等等,都是定义一个内部类继承AQS,然后将这个内部类作为同步组件的内部类,此内部类不对外暴露。

    暂时就讲这几个组件,了解了AQS,再学习相关组件会很容易的。

    相关文章

      网友评论

          本文标题:Java并发之AQS拓展组件(4)

          本文链接:https://www.haomeiwen.com/subject/uswlwctx.html