美文网首页jdk源码
AQS在jdk中的应用

AQS在jdk中的应用

作者: CodingRunning | 来源:发表于2020-10-19 09:16 被阅读0次

    上篇文章我们详细分析了AQS的底层实现原理,这节就来探索jdk中使用AQS实现的工具类

    从源码看AQS


    ReentrantLock

    一, 是什么?怎么用?

    是什么?

    是一个独占锁,也就是在并发环境下同一时刻只能有一个线程获得资源,也是一个可重入锁.

    可重入锁: 一个线程已经获取到了该资源,下次再次获取资源时不会出现等待情况(上次获取资源没有释放)

    怎么用?

    在各类并发的场景下,为了保证资源获取的正确性,可以保证每个资源同时只能被一个线程获取到.

    例如: 宿舍选宿系统(每张床位只能有一个学生抢到),秒杀活动(同一件商品不能被两个人买走)

    二, 类架构

    ReentrantLock架构图

    由上面架构图可以看出,ReentrantLock可以分为公平锁和非公平锁,而底层实现是AQS,在后面我们还可以看到更多的类底层都是由AQS实现的,所以说熟悉AQS原理对理解这些类是十分有必要的

    类的属性

    /**
     * 实现锁的同步器
     */
    private final Sync sync;
    
    /**
     * 抽象同步器
     * 子类可有公平和非公平两种方式,使用AQS的state字段来表示是否获取到锁和重入次数
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    
        /**
         * 根据子类实现,可以实现公平锁和非公平锁
         */
        abstract void lock();
    
        /**
         * 非公平方式获取锁
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //如果当前状态值为0也就是当前锁没有被其他线程持有,则尝试获取锁
            if (c == 0) {
                //获取锁,如果过去成功,则设置当前线程为独占线程
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前锁是当前线程所持有,则将重入次数+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                //如果冲入次数超过阈值,则将其置为负值
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
        /**
         * 释放锁
         */
        @Override
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //判断当前线程和锁持有线程是否为同一个线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //判断当前可重入次数是否为0,如果为0则清除线程占有标记
            if (c == 0) {
                free = true;
                //清除掉独占标记
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    
        /**
         * 判断当前线程是否持有锁
         */
        @Override
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    
        /**
         * 创建条件变量
         */
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    
        /**
         * 获取资源持有者
         */
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
    
        /**
         * 获取重入次数
         */
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
    
        /**
         * 是否已经持有锁
         */
        final boolean isLocked() {
            return getState() != 0;
        }
    }
    

    三, 具体实现

    公平式

    先获取资源的状态,如果没有人占用,判断当前线程是否为队列的首节点,如果是则尝试获取资源,获取成功修改独占线程,如果有人占用则判断独占线程和当前线程是否相同,如果相同的判断可重入的次数,超过抛出错误,否则重入成功

    /**
     * 公平锁
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        @Override
        final void lock() {
            acquire(1);
        }
    
        /**
         * 获取锁
         */
        @Override
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                /**
                 * 当前线程为队列的头节点并且获取资源成功,设置独占锁
                 */
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    

    非公平式

    先获取当前资源的状态,如果没有人占用,直接获取,获取成功修改独占线程的状态,有人占用查看当前占有线程是否为当前线程,如果是则进行重入,此外还需要判断重入次数,如果超过了阈值,抛出错误

    /**
     * 非公平锁
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        @Override
        final void lock() {
            /**
             * 自旋获取资源,成功后修改独占状态,
             * 失败后继续尝试获取,并将其加入到队列中以CLH自旋锁方式一直尝试获取资源
             */
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
            }
            else {
                //AQS内部方法,实则调用的是tryAcquire(),如果获取资源失败,则加入到AQS队列尾部,并且以自旋的方            //式一直尝试获取资源,不会响应中断,但是设置了终端标记,在获取到资源后会释放掉资源,并且将当前线程            //状态设置为CANCELLED,这一部分详细代码请看AQS源码解析
                acquire(1);
            }
        }
    
        /**
         * 尝试获取资源
         */
        @Override
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    

    构造器

    /**
    * 默认为非公平锁,相比于公平锁,性能更高,因为公平锁每次还需要查看AQS中是否有等待的线程
    */
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    //也可以指定创建公平锁或非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    案例

    之前某大厂的一个面试题,使用三个线程顺序打印出ABC三个字母,第一个线程打印A,然后第二个线程打印B,第三个线程打印C,打印10轮

    public class PrintWord {
    
        private static ReentrantLock lock = new ReentrantLock();
        static Condition conditionA = lock.newCondition();
        static Condition conditionB = lock.newCondition();
        static Condition conditionC = lock.newCondition();
        private static int i = 1;
    
        public static void main(String[] args) {
            new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    printA();
                }
            },"A").start();
            new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    printB();
                }
            },"B").start();
            new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    printC();
                }
            },"C").start();
        }
    
        private static void printA() {
            lock.lock();
            try {
                if (i != 1) {
                    conditionA.await();
                }
                System.out.println(Thread.currentThread().getName());
                i = 2;
                conditionB.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        private static void printB() {
            lock.lock();
            try {
                if (i != 2) {
                    conditionB.await();
                }
                System.out.println(Thread.currentThread().getName());
                i = 3;
                conditionC.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        private static void printC() {
            lock.lock();
            try {
                if (i != 3) {
                    conditionC.await();
                }
                System.out.println(Thread.currentThread().getName());
                i = 1;
                conditionA.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    

    Semaphore

    一, 是什么?怎么用?

    是什么?

    信号量,从概念上讲,信号量维护一组许可证,每个线程都可以来获取许可证,直至许可证为空

    怎么用?

    可以使用其控制并发线程的数量

    二, 类架构

    Semaphore架构图

    从上面架构图我们可以看出,Semaphore底层也是使用的AQS,并且和ReentrantLock一样,都提供了公平式和非公平式获取资源

    类的属性

    /**
     * 实现信号量的同步器
     */
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    
        /**
         * 设置许可证的数量
         */
        Sync(int permits) {
            setState(permits);
        }
    
        /**
         * 获取当前剩余的许可证数量
         */
        final int getPermits() {
            return getState();
        }
    
        /**
         * 以共享的方式非公平获取许可证
         */
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //获取目前所剩余的许可证
                int available = getState();
                //计算获取之后剩余的许可证
                int remaining = available - acquires;
                //如果许可证数量为负就修改state值
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    
        /**
         * 以共享的方式释放许可证
         */
        @Override
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //CAS设置许可证数量
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    
        /**
         * 按照具体的数量减少许可证
         */
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) 
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
    
        /**
         * 获取当前可以使用的许可证,如果等于0则直接修改state
         */
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    

    三, 具体实现

    公平式

    获取许可证: 每次获取先都需要看AQS中是否有等待的线程,如果有,则直接退出,否则获取许可证,修改剩余许可证的数量,并且返回剩余许可证数量

    释放许可证: 由AQS的releaseShared调用,释放许可证时,在原先的基础上加上释放的许可证,但是释放的数量不能为负,释放成功,调用AQS中的doReleaseShared方法,将队列头节点的状态设置为0然后从头节点的后继节点中找出一个状态值小于0的线程节点释放

    /**
     * 公平式同步器
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
    
        FairSync(int permits) {
            super(permits);
        }
    
        /**
         * 获取许可证,由AQS中的acquireShared方法调用,如果许可证数量小于0,
         * 则将当前线程加入到队列中一直轮询尝试过去许可证
         */
        @Override
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //队列中有等待的线程,直接返回
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                //获取许可证,CAS修改state值
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

    非公平式

    获取许可证: 上来直接尝试获取信号量,如果获取成功返回剩余许可证,如果许可证数量小于0则进入AQS队列中

    释放许可证: 和非公平式相同

    /**
     * 非公平方式下同步器
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        NonfairSync(int permits) {
            super(permits);
        }
    
        /**
         * 获取资源,这一步由AQS中的acquireShared调用,每次获取资源后会返回剩余的许可证,上面有写
         * 如果小于等于0则当前线程会一直处于CLH锁中,如果大于0则会唤醒队列中所有状态为SIGNAL的线程
         * 详情见AQS源码948行开始
         */
        @Override
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    

    获取许可证

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    

    释放许可证

    public void release() {
        sync.releaseShared(1);
    }
    

    相关文章

      网友评论

        本文标题:AQS在jdk中的应用

        本文链接:https://www.haomeiwen.com/subject/lzxkmktx.html