美文网首页
CountDownLatch|CyclicBarrier|Pha

CountDownLatch|CyclicBarrier|Pha

作者: kele2018 | 来源:发表于2024-07-04 10:30 被阅读0次
public class ForSynchronizer {

  public static void main(String[] args) throws InterruptedException {
      /**
       * Thread-1结束了(1)!
       * main结束了(ALL)!
       * Thread-0结束了(0)!
       * Thread-2结束了(2)!
       * Thread-3结束了(3)!
       */
      testCountDownLatch();
      /**
       * 0:Thread-0已准备就绪
       * 1:Thread-1已准备就绪
       * 3:Thread-3已准备就绪
       * 2:Thread-2已准备就绪
       * 2:Thread-2开始下一阶段的工作
       * 0:Thread-0开始下一阶段的工作
       * 3:Thread-3开始下一阶段的工作
       * 1:Thread-1开始下一阶段的工作
       */
      testCyclicBarrier();
      testPhaser();
      /**
       * 生产者线程交换后得到的数据: World
       * 消费者线程交换后得到的数据: Hello
       */
      testExchanger();
      /**
       * 线程 Thread-0 获取到许可,开始执行任务
       * 线程 Thread-2 获取到许可,开始执行任务
       * 线程 Thread-1 获取到许可,开始执行任务
       * 线程 Thread-2 任务执行完毕,释放许可
       * 线程 Thread-0 任务执行完毕,释放许可
       * 线程 Thread-1 任务执行完毕,释放许可
       * 线程 Thread-3 获取到许可,开始执行任务
       * 线程 Thread-4 获取到许可,开始执行任务
       * 线程 Thread-4 任务执行完毕,释放许可
       * 线程 Thread-3 任务执行完毕,释放许可
       */
      testSemaphore();

  }

  /**
   *
   * 0  1  2  3 四个同学跑步,2 3等其他两个人跑完了自己再跑
   * */
  public static void testCountDownLatch()  {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    new Thread(
            () -> {
              ForSleep.sleep(TimeUnit.MICROSECONDS.toNanos(1000));
              System.out.println(Thread.currentThread().getName() + "结束了(0)!");
              countDownLatch.countDown();
            })
        .start();
    new Thread(
            () -> {
              ForSleep.sleep(TimeUnit.MICROSECONDS.toNanos(1000));
              System.out.println(Thread.currentThread().getName() + "结束了(1)!");
              countDownLatch.countDown();
            })
        .start();

    new Thread(
            () -> {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "结束了(2)!");
            })
        .start();
      new Thread(
              () -> {
                  try {
                      countDownLatch.await();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(Thread.currentThread().getName() + "结束了(3)!");
              })
              .start();
      System.out.println(Thread.currentThread().getName() + "结束了(ALL)!");

  }

  /**
   *
   * 0  1  2  3 四个同学跑步, 大家在中途某个位置集合,然后再接着继续跑
   *
   * */
  public static void testCyclicBarrier(){
      final int totalWorker = 4;
      CyclicBarrier cyclicBarrier = new CyclicBarrier(totalWorker);
      for (int i = 0; i < totalWorker; i++) {
          int finalI = i;
          new Thread(() -> {
              System.out.println(finalI +":" + Thread.currentThread().getName() + "已准备就绪");
              try {
                  cyclicBarrier.await();
              } catch (InterruptedException | BrokenBarrierException e) {
                  e.printStackTrace();
              }
              System.out.println(finalI +":" + Thread.currentThread().getName() + "开始下一阶段的工作");
          }).start();
      }
  }


  Random rd = new Random();
  int bound = 5000;
  public void step1Task() throws InterruptedException {
      // 经过一段时间后,到达公司
      Thread.sleep(rd.nextInt(bound));
      System.out.println(
              "员工【" + Thread.currentThread().getName() + "】" + "到达公司!");
  }
  public void step2Task() throws InterruptedException {
      System.out.println(
              "员工【" + Thread.currentThread().getName() + "】" + "出发去公园玩...");
      // 玩了一段时间后,到公园门口集合
      Thread.sleep(rd.nextInt(bound));
      System.out.println(
              "员工【" + Thread.currentThread().getName() + "】" + "完成公园游玩!");
  }
  public void step3Task() throws InterruptedException {
      System.out.println(
              "员工【" + Thread.currentThread().getName() + "】" + "出发去餐厅...");
      // 玩了一段时间后,到公园门口集合
      Thread.sleep(rd.nextInt(bound));
      System.out.println(
              "员工【" + Thread.currentThread().getName() + "】" + "到达餐厅!");
  }
  public void step4Task() throws InterruptedException {
      System.out.println(
              "员工【" + Thread.currentThread().getName() + "】" + "开始用餐...");
      // 玩了一段时间后,到公园门口集合
      Thread.sleep(rd.nextInt(bound));
      System.out.println(
              "员工【" + Thread.currentThread().getName() + "】" + "回家了!");
  }
  /**
   *
   * 0  1  2  3 四个同学跑步, 大家在中途某个位置集合,然后 0 1 2接着跑,3退出;接着又在一个位置集合,0 1接着跑,2退出
   *
   * */
  public static void testPhaser(){
// 创建阶段协同器对象,重写了onAdvance方法,增加阶段到达处理逻辑
      final Phaser ph = new Phaser() {
          protected boolean onAdvance(int phase, int registeredParties) {
              int staffs = registeredParties - 1;
              switch (phase) {
                  case 0:
                      System.out.println("大家都到公司了,出发去公园!人数:" + staffs);
                      break;
                  case 1:
                      System.out.println("大家都到公园大门,出发去餐厅!人数:" + staffs);
                      break;
                  case 2:
                      System.out.println("大家都到餐厅了,开始用餐!人数:" + staffs);
                      break;
              }
              // 判断是否只剩主线程一个参与者,是,则返回true,阶段协同器终止。
              return registeredParties == 1;
          }
      };

      // 增加一个任务数,用来让主线程全程参与
      ph.register();

      final ForSynchronizer job = new ForSynchronizer();

      // 让3个全程参与的线程加入
      for (int i = 0; i < 3; i++) {
          // 增加参与任务数
          ph.register();
          new Thread(new Runnable() {
              @Override
              public void run() {
                  try {
                      job.step1Task();
                      ph.arriveAndAwaitAdvance();
                      job.step2Task();
                      System.out.println(
                              "员工【" + Thread.currentThread().getName() + "】"
                                      + "到达公园大门集合");
                      ph.arriveAndAwaitAdvance();
                      job.step3Task();
                      ph.arriveAndAwaitAdvance();
                      job.step4Task();
                      // 完成了,注销离开
                      ph.arriveAndDeregister();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }).start();
      }

      // 让两个不参加聚餐的员工加入
      for (int i = 0; i < 2; i++) {
          // 增加参与任务数
          ph.register();
          new Thread(new Runnable() {
              @Override
              public void run() {
                  try {
                      job.step1Task();
                      ph.arriveAndAwaitAdvance();
                      job.step2Task();
                      System.out.println(
                              "员工【" + Thread.currentThread().getName() + "】"
                                      + "回家了!");
                      // 完成了,注销离开
                      ph.arriveAndDeregister();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }).start();
      }

      while (!ph.isTerminated()) {
          int phaser = ph.arriveAndAwaitAdvance();

          if (phaser == 2) { // 到了去餐厅的阶段,让只参加晚上聚餐的人加入
              for (int i = 0; i < 4; i++) {
                  // 增加参与任务数
                  ph.register();
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              job.step3Task();
                              ph.arriveAndAwaitAdvance();
                              job.step4Task();
                              // 完成了,注销离开
                              ph.arriveAndDeregister();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              }
          }

      }
  }
  /**
   *
   * 0  1   两个同学跑步, 大家在中途某个位置集合,然后再接着继续跑
   *
   * */
  public static void testExchanger(){
      // 创建一个Exchanger对象
      Exchanger<String> exchanger = new Exchanger<>();
      // 创建一个线程,它将使用"Hello"与另一个线程交换数据
      Thread producer = new Thread(() -> {
          try {
              String producedData = "Hello";
              String consumerData = exchanger.exchange(producedData);
              System.out.println("生产者线程交换后得到的数据: " + consumerData);
          } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
          }
      }, "生产者线程");

      // 创建一个线程,它将使用"World"与另一个线程交换数据
      Thread consumer = new Thread(() -> {
          try {
              String consumerData = "World";
              String producedData = exchanger.exchange(consumerData);
              System.out.println("消费者线程交换后得到的数据: " + producedData);
          } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
          }
      }, "消费者线程");

      // 启动线程
      producer.start();
      consumer.start();
  }

  /**
   *
   * 0  1  2  3  4四个同学跑步, 但是操场只允许同时三个人跑步   则一人在外等待   有人退出时  再进入
   *
   * */
  public static void testSemaphore(){
      Semaphore semaphore = new Semaphore(3);
      // 创建10个线程
      for (int i = 0; i < 5; i++) {
          new Thread(()->{
              try {
                  // 获取许可
                  semaphore.acquire();
                  System.out.println("线程 " + Thread.currentThread().getName() + " 获取到许可,开始执行任务");
                  // 模拟耗时操作
                  Thread.sleep(2000);
                  System.out.println("线程 " + Thread.currentThread().getName() + " 任务执行完毕,释放许可");
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } finally {
                  // 释放许可
                  semaphore.release();
              }
          }).start();
      }


  }
}

相关文章

网友评论

      本文标题:CountDownLatch|CyclicBarrier|Pha

      本文链接:https://www.haomeiwen.com/subject/wpvicjtx.html