美文网首页
J.U.C 之AQS

J.U.C 之AQS

作者: 磊_5d71 | 来源:发表于2018-11-03 22:14 被阅读0次
    图片.png

    CountDownLatch

    • CountDownLatchExample1
    package com.alan.concurrency.example.aqs;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    @Slf4j
    public class CountDownLatchExample1 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                {
                    int threadNum = i;
                    exec.execute(() -> {
                        try {
                            test(threadNum);
                        } catch (InterruptedException e) {
                            log.error("InterruptedException", e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    });
                }
            }
    
            //通过countDown()和await()能保证所有线程执行完成后,再调用log.info("finish")
            countDownLatch.await();
            log.info("finish");
            exec.shutdown();
    
        }
    
        public static void test(int threadNum) throws InterruptedException {
            Thread.sleep(100);
            log.info("{}",threadNum);
        }
    
    }
    
    • CountDownLatchExample2 限制指定时间完成
    package com.alan.concurrency.example.aqs;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    
    @Slf4j
    public class CountDownLatchExample2 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                {
                    int threadNum = i;
                    exec.execute(() -> {
                        try {
                            test(threadNum);
                        } catch (InterruptedException e) {
                            log.error("InterruptedException", e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    });
                }
            }
    
            //通过countDown()和await()能保证所有线程执行完成后,再调用log.info("finish")
            //设置超时时间10毫秒
            countDownLatch.await(10,TimeUnit.MILLISECONDS);
            log.info("finish");
            //是先让当前线程任务都执行完成后,才进行shutdown操作
            exec.shutdown();
    
        }
    
        public static void test(int threadNum) throws InterruptedException {
            Thread.sleep(100);
            log.info("{}",threadNum);
        }
    
    }
    

    Semaphore 同步组件-信号量

    • Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。

    • 以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

    • 在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。

    • 更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 当一个线程调用Wait(等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。

    • 应用场景:只能访问有限的资源
      1、设置数据库的连接数
      2、设置数为1,将相当于单线程运行了。

    • 单一许可

    package com.alan.concurrency.example.aqs;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    
    @Slf4j
    public class SemaphoreExample1 {
    
        private final static int threadCount = 200;
    
        //设置允许的并发数为20
        private final static Semaphore semaphore = new Semaphore(20);
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
    
            for (int i = 0; i < threadCount; i++) {
                {
                    int threadNum = i;
                    exec.execute(() -> {
                        try {
                            semaphore.acquire();  //获取一个许可
                            test(threadNum);
                            semaphore.release();  //释放一个许可
                        } catch (InterruptedException e) {
                            log.error("InterruptedException", e);
                        }
                    });
                }
            }
    
            exec.shutdown();
    
        }
    
        public static void test(int threadNum) throws InterruptedException {
            log.info("{}",threadNum);
            Thread.sleep(1000);
        }
    
    }
    
    • 多个许可
    package com.alan.concurrency.example.aqs;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    
    @Slf4j
    public class SemaphoreExample1 {
    
        private final static int threadCount = 200;
    
        //设置允许的并发数为20
        private final static Semaphore semaphore = new Semaphore(20);
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
    
            for (int i = 0; i < threadCount; i++) {
                {
                    int threadNum = i;
                    exec.execute(() -> {
                        try {
                            semaphore.acquire(20);
                            test(threadNum);
                            semaphore.release(20);
                        } catch (InterruptedException e) {
                            log.error("InterruptedException", e);
                        }
                    });
                }
            }
    
            exec.shutdown();
    
        }
    
        public static void test(int threadNum) throws InterruptedException {
            log.info("{}",threadNum);
            Thread.sleep(1000);
        }
    
    }
    

    CyclicBarrier

    • CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。
    • CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
    package com.alan.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    
    @Slf4j
    public class CyclicBarrierExample1 {
    
    
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor= Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
    
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(()->{
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    
    
        private static  void race(int threadNum) throws Exception{
            Thread.sleep(1000);
            log.info("{} is ready",threadNum);
            barrier.await();
            log.info("{} continue",threadNum);
    
        }
    
    }
    

    ReentrantLock 与锁

    • 可重入性:
      从名字上理解,ReenTrantLock的字面意思就是再进入的锁,其实synchronized关键字所使用的锁也是可重入的,两者关于这个的区别不大。两者都是同一个线程没进入一次,锁的计数器都自增1,所以要等到锁的计数器下降为0时才能释放锁。

    • 锁的实现:
      Synchronized是依赖于JVM实现的,而ReenTrantLock是JDK实现的,有什么区别,说白了就类似于操作系统来控制实现和用户自己敲代码实现的区别。前者的实现是比较难见到的,后者有直接的源码可供阅读。

    • 性能的区别:
      在Synchronized优化以前,synchronized的性能是比ReenTrantLock差很多的,但是自从Synchronized引入了偏向锁,轻量级锁(自旋锁)后,两者的性能就差不多了,在两种方法都可用的情况下,官方甚至建议使用synchronized,其实synchronized的优化我感觉就借鉴了ReenTrantLock中的CAS技术。都是试图在用户态就把加锁问题解决,避免进入内核态的线程阻塞。

    • 功能区别:
      便利性:很明显Synchronized的使用比较方便简洁,并且由编译器去保证锁的加锁和释放,而ReenTrantLock需要手工声明来加锁和释放锁,为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。

    锁的细粒度和灵活度:很明显ReenTrantLock优于Synchronized

    • ReenTrantLock独有的能力:
      1、ReenTrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。
      2、ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
      3、ReenTrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。
    package com.alan.concurrency.example.lock;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    @ThreadSafe
    public class LockExample2 {
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
    
        //通过Lock接口实现
        private static Lock lock = new ReentrantLock();
    
    
        private  static void add(){
    
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            add();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count);
        }
    }
    
    • ReentrantReadWriteLock
    package com.alan.concurrency.example.lock;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Date;
    import java.util.Map;
    import java.util.Set;
    import java.util.TreeMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    @Slf4j
    public class LockExample3 {
    
    
        private final Map<String, Data> map = new TreeMap<>();
    
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
        //分别定义读锁和写锁
    
        private final Lock readLock = lock.readLock();
    
        private final Lock writeLock = lock.writeLock();
    
        public Data get(String key) {
            readLock.lock();
            try {
                return  map.get(key);
            } finally {
                readLock.unlock();
            }
        }
    
        public Set<String> getAllKeys(){
            readLock.lock();
            try {
                return  map.keySet();
            } finally {
                readLock.unlock();
            }
        }
    
        public Data put(String key, Data value){
            writeLock.lock();
            try {
                return  map.put(key,value);
            } finally {
                writeLock.unlock();
            }
        }
    }
    
    • StampedLock
    
    package com.alan.concurrency.example.lock;
    
    import java.util.concurrent.locks.StampedLock;
    
    public class LockExample4 {
    
        class Point {
            private double x, y;
            private final StampedLock sl = new StampedLock();
    
            void move(double deltaX, double deltaY) { // an exclusively locked method
                long stamp = sl.writeLock();
                try {
                    x += deltaX;
                    y += deltaY;
                } finally {
                    sl.unlockWrite(stamp);
                }
            }
    
            //下面看看乐观读锁案例
            double distanceFromOrigin() { // A read-only method
                long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
                double currentX = x, currentY = y;  //将两个字段读入本地局部变量
                if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                    stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                    try {
                        currentX = x; // 将两个字段读入本地局部变量
                        currentY = y; // 将两个字段读入本地局部变量
                    } finally {
                        sl.unlockRead(stamp);
                    }
                }
                return Math.sqrt(currentX * currentX + currentY * currentY);
            }
    
            //下面是悲观读锁案例
            void moveIfAtOrigin(double newX, double newY) { // upgrade
                // Could instead start with optimistic, not read mode
                long stamp = sl.readLock();
                try {
                    while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                        long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                        if (ws != 0L) { //这是确认转为写锁是否成功
                            stamp = ws; //如果成功 替换票据
                            x = newX; //进行状态改变
                            y = newY;  //进行状态改变
                            break;
                        } else { //如果不能成功转换为写锁
                            sl.unlockRead(stamp);  //我们显式释放读锁
                            stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                        }
                    }
                } finally {
                    sl.unlock(stamp); //释放读锁或写锁
                }
            }
        }
    }
    
    package com.alan.concurrency.example.lock;
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.StampedLock;
    
    @Slf4j
    @ThreadSafe
    public class LockExample5 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private final static StampedLock lock = new StampedLock();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}", count);
        }
    
        private static void add() {
            long stamp = lock.writeLock();
            try {
                count++;
            } finally {
                lock.unlock(stamp);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:J.U.C 之AQS

          本文链接:https://www.haomeiwen.com/subject/dwvkxqtx.html