public class ForSynchronizer {
public static void main(String[] args) throws InterruptedException {
/**
* Thread-1结束了(1)!
* main结束了(ALL)!
* Thread-0结束了(0)!
* Thread-2结束了(2)!
* Thread-3结束了(3)!
*/
testCountDownLatch();
/**
* 0:Thread-0已准备就绪
* 1:Thread-1已准备就绪
* 3:Thread-3已准备就绪
* 2:Thread-2已准备就绪
* 2:Thread-2开始下一阶段的工作
* 0:Thread-0开始下一阶段的工作
* 3:Thread-3开始下一阶段的工作
* 1:Thread-1开始下一阶段的工作
*/
testCyclicBarrier();
testPhaser();
/**
* 生产者线程交换后得到的数据: World
* 消费者线程交换后得到的数据: Hello
*/
testExchanger();
/**
* 线程 Thread-0 获取到许可,开始执行任务
* 线程 Thread-2 获取到许可,开始执行任务
* 线程 Thread-1 获取到许可,开始执行任务
* 线程 Thread-2 任务执行完毕,释放许可
* 线程 Thread-0 任务执行完毕,释放许可
* 线程 Thread-1 任务执行完毕,释放许可
* 线程 Thread-3 获取到许可,开始执行任务
* 线程 Thread-4 获取到许可,开始执行任务
* 线程 Thread-4 任务执行完毕,释放许可
* 线程 Thread-3 任务执行完毕,释放许可
*/
testSemaphore();
}
/**
*
* 0 1 2 3 四个同学跑步,2 3等其他两个人跑完了自己再跑
* */
public static void testCountDownLatch() {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(
() -> {
ForSleep.sleep(TimeUnit.MICROSECONDS.toNanos(1000));
System.out.println(Thread.currentThread().getName() + "结束了(0)!");
countDownLatch.countDown();
})
.start();
new Thread(
() -> {
ForSleep.sleep(TimeUnit.MICROSECONDS.toNanos(1000));
System.out.println(Thread.currentThread().getName() + "结束了(1)!");
countDownLatch.countDown();
})
.start();
new Thread(
() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束了(2)!");
})
.start();
new Thread(
() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束了(3)!");
})
.start();
System.out.println(Thread.currentThread().getName() + "结束了(ALL)!");
}
/**
*
* 0 1 2 3 四个同学跑步, 大家在中途某个位置集合,然后再接着继续跑
*
* */
public static void testCyclicBarrier(){
final int totalWorker = 4;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalWorker);
for (int i = 0; i < totalWorker; i++) {
int finalI = i;
new Thread(() -> {
System.out.println(finalI +":" + Thread.currentThread().getName() + "已准备就绪");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(finalI +":" + Thread.currentThread().getName() + "开始下一阶段的工作");
}).start();
}
}
Random rd = new Random();
int bound = 5000;
public void step1Task() throws InterruptedException {
// 经过一段时间后,到达公司
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "到达公司!");
}
public void step2Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "出发去公园玩...");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "完成公园游玩!");
}
public void step3Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "出发去餐厅...");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "到达餐厅!");
}
public void step4Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "开始用餐...");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "回家了!");
}
/**
*
* 0 1 2 3 四个同学跑步, 大家在中途某个位置集合,然后 0 1 2接着跑,3退出;接着又在一个位置集合,0 1接着跑,2退出
*
* */
public static void testPhaser(){
// 创建阶段协同器对象,重写了onAdvance方法,增加阶段到达处理逻辑
final Phaser ph = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出发去公园!人数:" + staffs);
break;
case 1:
System.out.println("大家都到公园大门,出发去餐厅!人数:" + staffs);
break;
case 2:
System.out.println("大家都到餐厅了,开始用餐!人数:" + staffs);
break;
}
// 判断是否只剩主线程一个参与者,是,则返回true,阶段协同器终止。
return registeredParties == 1;
}
};
// 增加一个任务数,用来让主线程全程参与
ph.register();
final ForSynchronizer job = new ForSynchronizer();
// 让3个全程参与的线程加入
for (int i = 0; i < 3; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step1Task();
ph.arriveAndAwaitAdvance();
job.step2Task();
System.out.println(
"员工【" + Thread.currentThread().getName() + "】"
+ "到达公园大门集合");
ph.arriveAndAwaitAdvance();
job.step3Task();
ph.arriveAndAwaitAdvance();
job.step4Task();
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 让两个不参加聚餐的员工加入
for (int i = 0; i < 2; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step1Task();
ph.arriveAndAwaitAdvance();
job.step2Task();
System.out.println(
"员工【" + Thread.currentThread().getName() + "】"
+ "回家了!");
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
while (!ph.isTerminated()) {
int phaser = ph.arriveAndAwaitAdvance();
if (phaser == 2) { // 到了去餐厅的阶段,让只参加晚上聚餐的人加入
for (int i = 0; i < 4; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step3Task();
ph.arriveAndAwaitAdvance();
job.step4Task();
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
}
/**
*
* 0 1 两个同学跑步, 大家在中途某个位置集合,然后再接着继续跑
*
* */
public static void testExchanger(){
// 创建一个Exchanger对象
Exchanger<String> exchanger = new Exchanger<>();
// 创建一个线程,它将使用"Hello"与另一个线程交换数据
Thread producer = new Thread(() -> {
try {
String producedData = "Hello";
String consumerData = exchanger.exchange(producedData);
System.out.println("生产者线程交换后得到的数据: " + consumerData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "生产者线程");
// 创建一个线程,它将使用"World"与另一个线程交换数据
Thread consumer = new Thread(() -> {
try {
String consumerData = "World";
String producedData = exchanger.exchange(consumerData);
System.out.println("消费者线程交换后得到的数据: " + producedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "消费者线程");
// 启动线程
producer.start();
consumer.start();
}
/**
*
* 0 1 2 3 4四个同学跑步, 但是操场只允许同时三个人跑步 则一人在外等待 有人退出时 再进入
*
* */
public static void testSemaphore(){
Semaphore semaphore = new Semaphore(3);
// 创建10个线程
for (int i = 0; i < 5; i++) {
new Thread(()->{
try {
// 获取许可
semaphore.acquire();
System.out.println("线程 " + Thread.currentThread().getName() + " 获取到许可,开始执行任务");
// 模拟耗时操作
Thread.sleep(2000);
System.out.println("线程 " + Thread.currentThread().getName() + " 任务执行完毕,释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可
semaphore.release();
}
}).start();
}
}
}
网友评论