美文网首页
Concurrent Java 04 - JUC之AQS

Concurrent Java 04 - JUC之AQS

作者: 阿武_Accat | 来源:发表于2019-02-07 12:36 被阅读0次

    AbstractQueuedSynchronizer - AQS

    AQS本质

    Provides a framework for implementing blocking locks and related
    synchronizers (semaphores, events, etc) that rely on
    first-in-first-out (FIFO) wait queues.

    AQS本质是一个支持FIFO的同步队列,使用Node构建锁或其他同步组件的基础框架。
    CountDownLatch,SemaphoreReentrantLock内部就实现了这种同步队列。

    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    

    AQS组件 - CountDownLatch

    await线程等待直到某个条件值为0
    使用它使用了计数器阻塞当前线程,直到计数器为0,只会出现一次。 等待计数的线程
    package com.accat.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class CountDownLatchExample1 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await();
            log.info("finish");
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
            Thread.sleep(100);
        }
    }
    
    countDownLatch.await(10, TimeUnit.MILLISECONDS); // 也支持指定等待时间,超时则继续执行
    

    AQS组件 - Semaphore

    信号量,规划一次性可同时运行的线程个数。

    四个队伍的队员接力跑运动员(一次可容纳四线程)在起跑线上等待接棒(信号量)。 等待信号量的线程
    package com.accat.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    public class SemaphoreExample1 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire(); // 获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    

    尝试做一些操作,如果没有及时操作则丢弃这些操作。
    如:接包处理,如果处理时间超时,将处理不及时的包丢弃掉。

    package com.accat.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class SemaphoreExample4 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                            test(threadNum);
                            semaphore.release(); // 释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    

    AQS组件 - CyclicBarrier

    image.png

    计数器容许重置后再使用,多个线程等待其他线程的关系。

    相当于多个运动员相互等待,等待其他运动员就位后再一起冲刺,这里既有计数器功能,又有信号量功能。 相互等待和冲刺的线程
    package com.accat.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class CyclicBarrierExample1 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            barrier.await();
            log.info("{} continue", threadNum);
        }
    }
    

    当所有就位的运动员(线程)准备时, 裁判员鸣枪, 运动员抢跑。
    这个过程相当于多个线程互相等待就位后,需要在之前执行其他操作(鸣枪),
    之后所有线程同时执行,CyclicBarrier支持这种操作。

    package com.accat.concurrency.example.aqs;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class CyclicBarrierExample3 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            log.info("callback is running");
        });
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            barrier.await();
            log.info("{} continue", threadNum);
        }
    }
    

    AQS组件 - ReentrantLock

    ReentrantLocksynchronize区别
    1.可重入锁,synchronize依赖JVM实现,ReentrantLock依赖JDK实现,后者能查看源码。
    2.两者性能相差不大,官方推荐synchronize,实现更加容易,不用手动释放锁。
    3.ReentrantLock可以指定公平锁和非公平锁。
    4.ReentrantLock提供一个Condition类,可以分组唤醒需要唤醒的线程
    5.ReentrantLock提供一个能够中断等待锁的线程的机制, lock.lockInterruptibly()
    ReentrantLock是一种自旋锁实现,通过CAS机制不断尝试加锁,避免线程进入内核态的阻塞状态,想尽办法让线程不进入内核态的阻塞状态是理解锁设计的关键。

    package com.accat.concurrency.example.lock;
    
    import com.accat.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    @ThreadSafe
    public class LockExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private final static Lock lock = new ReentrantLock();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}", count);
        }
    
        private static void add() {
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
        }
    }
    

    AQS组件 - ReentrantReadWriteLock

    让我们想一下,我们有一个类Data, 其中getData(), setData(),我们要保证这个类线程安全,在其上加锁,那么我们需要getData(), setData()互斥和setData(), setData()互斥,但是如果直接使用synchronize或者ReentrantLock的话,那么getData(), getData()也将互斥,这不是我们要的。
    所以JDK提供了一个ReentrantLock的继承类ReentrantReadWriteLock,实现读写锁分离的实现,避免上述问题。

    package com.mmall.concurrency.example.lock;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    import java.util.Set;
    import java.util.TreeMap;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    @Slf4j
    public class LockExample3 {
    
        private final Map<String, Data> map = new TreeMap<>();
    
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
        private final Lock readLock = lock.readLock();
    
        private final Lock writeLock = lock.writeLock();
    
        public Data get(String key) {
            readLock.lock();
            try {
                return map.get(key);
            } finally {
                readLock.unlock();
            }
        }
    
        public Set<String> getAllKeys() {
            readLock.lock();
            try {
                return map.keySet();
            } finally {
                readLock.unlock();
            }
        }
    
        public Data put(String key, Data value) {
            writeLock.lock();
            try {
                return map.put(key, value);
            } finally {
                readLock.unlock();
            }
        }
    
        class Data {
    
        }
    }
    

    StampedLock

    StampedLock 是Java乐观锁的一种实现。乐观锁是为执行操作的对象附加一个versionId,每次操作前获取versionId,操作完后查看versionId是否没被改变,如果是则执行更新,不是则放弃更新,重新操作。
    乐观锁适用于读多写少的场景。

    package com.accat.concurrency.example.lock;
    
    import com.accat.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.StampedLock;
    
    @Slf4j
    @ThreadSafe
    public class LockExample5 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private final static StampedLock lock = new StampedLock();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}", count);
        }
    
        private static void add() {
            long stamp = lock.writeLock();
            try {
                count++;
            } finally {
                lock.unlock(stamp);
            }
        }
    }
    

    AQS组件 - Condition

    两个队列的Node交换

    本质上就是加入Sync queue之后, Sync queueCondition queue之间Node的相互放置操作。

    package com.accat.concurrency.example.lock;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class LockExample6 {
    
        public static void main(String[] args) {
            ReentrantLock reentrantLock = new ReentrantLock();
            Condition condition = reentrantLock.newCondition();
    
            new Thread(() -> {
                try {
                    reentrantLock.lock();  // 获取锁,加入 Sync queue
                    log.info("wait signal"); // 1 
                    condition.await();  // 将锁释放, 进入Condition queue
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("get signal"); // 4
                reentrantLock.unlock();
            }).start();
    
            new Thread(() -> {
                reentrantLock.lock();  // 由于锁被释放,拿到锁, 进入Sync queue
                log.info("get lock"); // 2
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                condition.signalAll();  // 将Condition queue中的Node放回到 Sync queue
                log.info("send signal ~ "); // 3
                reentrantLock.unlock();  // 释放锁, 将Node从Sync queue移除
            }).start();
        }
    }
    

    相关文章

      网友评论

          本文标题:Concurrent Java 04 - JUC之AQS

          本文链接:https://www.haomeiwen.com/subject/bhghsqtx.html