Java高并发--AQS

作者: sunhaiyu | 来源:发表于2019-04-20 16:07 被阅读110次

    Java高并发--AQS

    主要是学习慕课网实战视频《Java并发编程入门与高并发面试》的笔记

    AQS是AbstractQueuedSynchronizer的简称,直译过来是抽象队列同步器。AQS的底层数据结构是队列,如下所示

    image

    AQS使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架,利用一个int类型表示状态(state)

    使用该框架的功能需要让子类继承,并重写相关方法。

    • 子类通过继承并通过重写方法管理其状态acquire和release的方法操纵状态

    • 可以同时实现排他锁和共享锁模式(独占、共享),要么使用独占锁要么使用共享锁,而不会同时使用两者

    闭锁- CountdownLatch

    常称为"闭锁",是一个倒计数器,如下,设定了倒计数值为3.当前线程调用await()被阻塞,其他线程每调用一次countDown()计数器减1,一直到计数值cnt等于0时,当前线程才可以继续(恢复)执行。可以理解为一个线程等待多个线程执行完毕,这样该线程可以利用其他多个线程的执行结果。

    image

    下面是CountdownLatch的简单使用,当计数器从60减到0时候,当前线程会打印“Go!”

    由于当前线程被阻塞,需等到其余60个线程执行完毕,因此当前线程的"Go!"会最后打印。

    package com.shy.concurrency.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author Haiyu
     * @date 2019/1/3 11:00
     */
    @Slf4j
    public class CountdownLatchExample {
        private static CountDownLatch cdl = new CountDownLatch(60);
    
        public static void main(String[] args) throws InterruptedException {
            final int num = 60;
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = num; i >= 0; i--) {
                final int cur = i;
                executorService.execute(() -> {
                    try {
                        run(cur);
                    } catch (InterruptedException e) {
                        log.error("exception", e);
                    } finally {
                        cdl.countDown();
                    }
                });
            }
            cdl.await();
            System.out.println("Go!");
            executorService.shutdown();
        }
    
        public static void run(int cur) throws InterruptedException {
            Thread.sleep(100);
            System.out.println(cur);
        }
    }
    
    

    使用CountdownLatch还可以实现限时任务,在await()方法中可以传入参数,如下表示当前线程只会等待10毫秒,超过这个时间就不再等待而是恢复执行。

    cdl.await(10, TimeUnit.MICROSECONDS);
    

    将上面的程序cdl.await();修改成cdl.await(10, TimeUnit.MICROSECONDS);,将最先打印"Go!"因为它只等待10毫秒,而其他线程每次执行前都会sleep100毫秒。

    信号量 - Semaphore

    可以控制某个资源可以有多少个线程同时访问,即可以控制并发量。主要通过acquire和release方法,

    • acquire():获得一个许可,若无法获得会一直等待直到有线程释放了许可或者被中断
    • tryAcquire():尝试获得一个许可,获取成功返回true,失败返回false,不会等待立即返回
    • release():释放一个许可

    acquire()release()还可以传入参数,指定一次获得/释放的许可数;tryAcquire()还可以指定时间,在指定时间内没有获得到许可就返回false。

    循环栅栏 - CyclicBarrier

    cyclicBarrier强调线程之间互相等待

    image

    CountdownLatch和CyclicBarrier的区别如下:

    • CountDownLatch强调一个线程等待其他所有线程,通过cdl.await()让当前线程等待在倒计数器上,每有一个线程执行完,cdl.countDown(),将计数减1,减到0时通知当前线程执行。简单的说就是一个线程等待,直到他所等待的其他线程都执行完成,当前线程才可以继续执行。
    • cyclicBarrier强调线程之间互相等待,只要有一个线程还没到来,所有线程会一起等待。可以传入一个Runnable作为计数完成要执行的任务。每有一个线程调用cyc.await()计数减1,减到0时会执行一次该Runnable。简单地说就是线程之间互相等待,等所有线程都准备好,即调用await()方法之后,执行一次Runnable,此时所有线程开始同时执行!
    • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。

    下面是一个例子,CyclicBarrier还可以传入一个Runnable, 每次计数到的时候都会执行一次。

    package com.shy.concurrency.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.*;
    
    /**
     * @author Haiyu
     * @date 2019/1/3 11:00
     */
    @Slf4j
    public class CyclicBarrierExample {
        private static CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> System.out.println("集合完毕"));
    
        public static void main(String[] args) throws InterruptedException {
            final int num = 10;
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0;i < num; i++) {
    
                final int cur = i;
                executorService.execute(() -> {
                    try {
                        run(cur);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executorService.shutdown();
        }
    
        public static void run(int cur) throws InterruptedException, BrokenBarrierException {
            log.info("{} is ready", cur);
            cyclicBarrier.await();
            log.info("{} is working", cur);
        }
    }
    
    

    CyclicBarrier(10, () -> System.out.println("集合完毕"));该句表示10个线程相互等待,所有10个线程都调用了await()方法后,会执行一次传入的Runnable;之后10个线程被唤醒,计数重新开始,这也是Cyclic的意义——可重复利用的计数器。

    因此上面程序会先打印10个ready,然后打印一次“集合完毕”;之后会打印10个working。

    重入锁 - ReentrantLock

    synchronized是JVM的内置锁,而重入锁是Java代码实现的。重入锁是synchronized的扩展,可以完全代替后者。重入锁可以重入,允许同一个线程连续多次获得同一把锁。其次,重入锁独有的功能有:

    • 可以相应中断,synchronized要么获得锁执行,要么保持等待。而重入锁可以响应中断,使得线程在迟迟得不到锁的情况下,可以不再等待。主要由lockInterruptibly()实现,这是一个可以对中断进行响应的锁申请动作,锁中断可以避免死锁。
    • 锁的申请可以有等待时限,用tryLock()可以实现限时等待,如果超时还未获得锁会返回false,也防止了线程迟迟得不到锁时一直等待,可避免死锁。
    • 公平锁,即锁的获得按照线程先来后到的顺序依次获得,不会产生饥饿现象。synchronized的锁默认是不公平的,重入锁可通过传入构造方法的参数实现公平锁。
    • 重入锁可以绑定多个Condition条件,这些condition通过调用await/singal实现线程间通信。可以实现分组唤醒需要唤醒的线程,而不是像synchronized的wait/notify一样要么随机唤醒要给要么唤醒全部。

    读写锁 - ReentrantReadWriteLock

    ReadWriteLock即读写锁,它有两个方法如下,分别返回一个读锁和写锁,即读写锁分离。

    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();
    

    在读时使用readLock进行加锁,在写时使用writeLock进行加锁。使得读-读不阻塞,读线程完全并行,适合读多写少的场合。读锁会完全阻塞写锁,它使用的依然是悲观的锁策略。如果有大量的读线程,也有可能引起写线程的饥饿。(如果想获得写锁,不允许还有读锁、写锁还保持着,即 在没有任何读锁、写锁的情况下才能获得写锁)

    相关文章

      网友评论

        本文标题:Java高并发--AQS

        本文链接:https://www.haomeiwen.com/subject/ntrlgqtx.html