美文网首页Java 杂谈
Java 并发之 CountDownLatch、CyclicBa

Java 并发之 CountDownLatch、CyclicBa

作者: 叫我宫城大人 | 来源:发表于2019-07-16 00:17 被阅读1次

    疑问

    • 多个线程执行结束后怎么执行某一特定操作?
    • 怎么限制执行某块业务的线程的数量?

    CountDownLatch

    计数器锁,初始化一个 count(数)锁,每个业务线程依次 countDown (递减),主线程阻塞 await (等待)直至 count 等于 0,或者指定 await 时间:

    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author caojiantao
     */
    public class Test {
    
        public static void main(String[] args) {
            System.out.println("========================= 开始 =========================");
            // 工作线程数量
            int workCount = 5;
            // 模拟耗时范围
            int cost = 3000;
            CountDownLatch latch = new CountDownLatch(workCount);
            for (int i = 0; i < workCount; i++) {
                new Thread(() -> {
                    System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 开始执行...");
                    try {
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(cost));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 执行完...");
                    latch.countDown();
                }).start();
            }
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("========================= 结束 =========================");
        }
    
        private static String getTimeFmtString(){
            return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
        }
    }
    

    CyclicBarrier

    篱栅,功能与 CountDownLatch 大体相同,但是其特色支持计数器重置,循环使用:

    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author caojiantao
     */
    public class Test {
    
        public static void main(String[] args) {
            System.out.println("========================= 开始 =========================");
            // 工作线程数量
            int workCount = 5;
            // 模拟耗时范围
            int cost = 3000;
            CyclicBarrier barrier = new CyclicBarrier(workCount, () -> {
                System.out.println("========================= 结束 =========================");
            });
            for (int i = 0; i < workCount; i++) {
                new Thread(() -> {
                    try {
                        System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 开始执行...");
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(cost));
                        System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 执行完...");
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    
        private static String getTimeFmtString() {
            return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
        }
    }
    

    注:相比较 CountDownLatch,CyclicBarrier 能够 reset 重置计数器,同时注意只有 await 方法会阻塞当前线程,countDown 并不会。

    Semaphore

    信号量,保持当前信号量(执行线程数量)最多为 permits:

    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Random;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author caojiantao
     */
    public class Test {
    
        public static void main(String[] args) {
            // 工作线程数量
            int workCount = 10;
            // 模拟耗时范围
            int cost = 3000;
            Semaphore semaphore = new Semaphore(5);
            for (int i = 0; i < workCount; i++) {
                new Thread(() -> {
                    try {
                        // 申请执行权限
                        semaphore.acquire();
                        System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 开始执行...");
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(cost));
                        System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 执行完...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 释放权限
                        semaphore.release();
                    }
                }).start();
            }
        }
    
        private static String getTimeFmtString() {
            return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
        }
    }
    

    案例

    三方平台比价接口

    假如存在 A、B、C 三个平台正在出售某商品 goods,现在需要多线程获取三个平台该商品 goods 价格,最终输出最低价格价格信息。

    import java.math.BigDecimal;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author caojiantao
     */
    public class Test {
    
        private static Map<String, Double> infoMap = new ConcurrentHashMap<>(3);
    
        public static void main(String[] args) {
            CountDownLatch latch = new CountDownLatch(3);
            Thread a = new Thread(new Task("A", latch));
            Thread b = new Thread(new Task("B", latch));
            Thread c = new Thread(new Task("C", latch));
            a.start();
            b.start();
            c.start();
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Map.Entry<String, Double> lowest = null;
            for (Map.Entry<String, Double> entry : infoMap.entrySet()) {
                if (lowest == null || entry.getValue() < lowest.getValue()) {
                    lowest = entry;
                }
            }
            assert lowest != null;
            System.out.println(getTimeFmtString() + " 最低价格信息为:" + lowest.getKey() + " " + lowest.getValue());
        }
    
        private static String getTimeFmtString() {
            return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
        }
    
        static class Task implements Runnable {
    
            private String name;
            private CountDownLatch latch;
    
            public Task(String name, CountDownLatch latch) {
                this.name = name;
                this.latch = latch;
            }
    
            @Override
            public void run() {
                int cost = 5000;
                try {
                    TimeUnit.MILLISECONDS.sleep(new Random().nextInt(cost));
                    double price = new BigDecimal(new Random().nextDouble() * 1000).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
                    infoMap.put(name, price);
                    System.out.println(getTimeFmtString() + " " + name + " 报价:" + price);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }
        }
    }
    

    限制接口访问次数

    存在某接口 queryData,需要控制在 10 个访问数以内,超过的请求阻塞直至有正在执行的请求已经完成。

    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Random;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author caojiantao
     */
    public class Service {
    
        private Semaphore semaphore = new Semaphore(10);
    
        public void queryData() {
            // 模拟耗时范围
            int cost = 3000;
            try {
                // 申请执行权限
                semaphore.acquire();
                System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 请求...");
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(cost));
                System.out.println(getTimeFmtString() + " " + Thread.currentThread().getName() + " 结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放权限
                semaphore.release();
            }
        }
    
        private String getTimeFmtString() {
            return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
        }
    }
    

    相关文章

      网友评论

        本文标题:Java 并发之 CountDownLatch、CyclicBa

        本文链接:https://www.haomeiwen.com/subject/uapokctx.html