美文网首页
并发工具类

并发工具类

作者: shen33 | 来源:发表于2019-03-22 11:36 被阅读0次

    CountDownLatch

    专业术语:在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
    大白话:就是有N个人投票(线程),统计员等待着所有人投完票的过程。
    注意:线程执行完毕后,调用countDown(),该线程是执行完毕的。
    使用:

    int size = 10; //定义十个人投票
    CountDownLatch latch = new CountDownLatch(size); //定义10个容量计数器
    for (int i = 0; i < size; i++) {
        // 模拟投票
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "  投票...");
            latch.countDown(); //投完票计数器减一
        }).start();
    }
    latch.await();//等待计数器为0 才进行下面的任务
    System.out.println("所有人投票完毕,统计票数...");
    

    CountDownLatch的api:

    1. CountDownLatch.await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
    2. CountDownLatch.countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
    3. CountDownLatch.getCount() 返回当前计数。
    4. CountDownLatch.toString() 返回标识此锁存器及其状态的字符串。状态用括号括起来,包括字符串 "Count =",后跟当前计数。

    CyclicBarrier

    专业术语:它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。
    大白话:就是队员集合(多个线程),已经集合的人等待剩下的人集合完毕后,队长开始统计人数的过程。
    注意:这个过程中的线程是出于等待状态的。

    import java.util.Random;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 部队集合-> 统计人数 -> 解散
     */
    public class Demo {
        Random random = new Random();
        public void meet(CyclicBarrier cyclicBarrier){
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(4)); // 随机休息4s的时间
                System.out.println(Thread.currentThread().getName() + " 集合...."); //模拟队员集合
                cyclicBarrier.await(); //等待其他队员集合
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 解散");
        }
        public static void main(String[] args) throws Exception {
            Demo d = new Demo();
            int size = 10; //定义队员数量
            //定义屏障数量,并达到数量之后完成对应的操作
            CyclicBarrier barrier = new CyclicBarrier(size, () -> System.out.println("开始报数...."));
            for (int i = 0; i < size; i++) {
                new Thread(() -> d.meet(barrier)).start();
            }
        }
    }
    

    CyclicBarrier的api:

    1. CyclicBarrier.await() 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
    2. CyclicBarrier.getNumberWaiting() 返回当前在屏障处等待的参与者数目。
    3. CyclicBarrier.getParties() 返回要求启动此 barrier 的参与者数目。
    4. CyclicBarrier.isBroken() 查询此屏障是否处于损坏状态。
    5. CyclicBarrier.reset() 将屏障重置为其初始状态。

    Semaphore

    专业术语:从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
    通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
    通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
    大白话:类似世博会,中国会场里面限定100个人同时参观,出来一个人,在放进去一个。

    /**
     * 模拟参观世博会,并限制人流
     */
    public class Demo {
        public void visit(Semaphore semaphore) {
            try {
                semaphore.acquire(); // 获取信号量
                System.out.println(Thread.currentThread().getName() + " 参观会场");
                semaphore.release(); // 释放信号量
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args)  {
            // 创建一个容量5的公平的信号量
            Semaphore semaphore = new Semaphore(5, true);
            Demo demo = new Demo();
            for (int i = 0; i < 10; i++) {
                new Thread(() -> demo.visit(semaphore));
            }
        }
    }
    

    Semaphore的api:

    1. Semaphore.acquire() 获取信号量
    2. Semaphore.release() 释放信号量

    Exchanger

    专业术语:对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

    用途:可以用于两个线程某个对象的比对、或者对象的传递

    public class Demo {
        public void a(Exchanger<String> exch) {
            System.out.println("a 方法执行...");
            try {
                System.out.println("a 线程正在抓取数据...");
                Thread.sleep(2000);
                System.out.println("a 线程抓取到数据...");
                String res = "12345";
                System.out.println("a 等待对比结果...");
                exch.exchange(res);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
        public void b(Exchanger<String> exch) {
            System.out.println("b 方法开始执行...");
            try {
                System.out.println("b 方法开始抓取数据...");
                Thread.sleep(4000);
                System.out.println("b 方法抓取数据结束...");
                String res = "12345";
                String value = exch.exchange(res);
                System.out.println("开始进行比对...");
                System.out.println("比对结果为:" + value.equals(res));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            Demo d = new Demo();
            Exchanger<String> exch = new Exchanger<>();
            new Thread(() -> d.a(exch)).start();
            new Thread(() -> d.b(exch)).start();
        }
    }
    

    Exchanger的api:

    1. Exchanger.exchange() 等待另一个线程到达此交换点(除非当前线程被 中断),然后将给定的对象传送给该线程,并接收该线程的对象。
      如果另一个线程已经在交换点等待,则出于线程调度目的,继续执行此线程,并接收当前线程传入的对象。当前线程立即返回,接收其他线程传递的交换对象。

    相关文章

      网友评论

          本文标题:并发工具类

          本文链接:https://www.haomeiwen.com/subject/ookqvqtx.html