J.U.C包下的同步工具类
类 | 作⽤ |
---|---|
Semaphore | 信号量 - 用来控制同一时间,资源可被访问的线程数量,一般可用于流量的控制。 |
CountDownLatch | 多线程同步控制工具,它被称之为门阀、计数器或者闭锁。 是基于AQS共享模式实现的。 常见使用场景:Zookeeper分布式锁,Jmeter模拟高并发等 |
CyclicBarrier | 回环栅栏(回环屏障)- 可以让一组线程全部达到一个状态后再全部同时执行。 这里之所以叫做回环是因为所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。之所以叫做屏障是因为当线程调用await方法后会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。 基于ReentrantLock实现的,且也是基于AQS |
Phaser | 增强的CyclicBarrier。 |
Semaphore
-
Semaphore是基于AQS的共享模式进行实现的,其内部也有一个Sync类继承与AQS,且和ReentrantLock相似也分为公平锁和非公平锁。
-
Semaphore是一个计数信号量,是并发包中提供的用于控制某资源同时被访问的个数
-
Semaphore的主要作用就是限流,限制某块代码的线程访问数,当Semaphore(1),传入参数为1时,就相当于synchronized同步锁了。
Semaphore使用示例:
package com.study.thread.conllection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
//创建信号量实例,传入参数为0,即计数器起始为0
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
int size = 5;
ExecutorService executorService = Executors.newFixedThreadPool(size*2);
for (int i = 1; i <= size; i++) {
//将任务i添加至线程池 有释放许可release()
executorService.submit(() -> {
try {
//执行操作
System.out.println(Thread.currentThread() + "执行结束-有释放许可release");
//释放一个许可,并将其返还给信号量,也就是将 permits+1
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
});
//将任务i添加至线程池 没有释放许可release()
executorService.submit(() -> {
try {
//执行操作
System.out.println(Thread.currentThread() + "执行结束-没有释放许可release");
} catch (Exception e) {
e.printStackTrace();
}
});
}
//获取许可
//传参为size说明调用acquire方法的线程会一直阻塞,直到信号量的计数变为size才会返回
semaphore.acquire(5);
System.out.println("所有线程执行结束-有释放许可release");
//关闭线程池
executorService.shutdown();
}
}
Thread[pool-1-thread-1,5,main]执行结束-有释放许可release
Thread[pool-1-thread-2,5,main]执行结束-没有释放许可release
Thread[pool-1-thread-3,5,main]执行结束-有释放许可release
Thread[pool-1-thread-4,5,main]执行结束-没有释放许可release
Thread[pool-1-thread-5,5,main]执行结束-有释放许可release
Thread[pool-1-thread-6,5,main]执行结束-没有释放许可release
Thread[pool-1-thread-7,5,main]执行结束-有释放许可release
Thread[pool-1-thread-8,5,main]执行结束-没有释放许可release
Thread[pool-1-thread-9,5,main]执行结束-有释放许可release
所有线程执行结束-有释放许可release
Thread[pool-1-thread-10,5,main]执行结束-没有释放许可release
Semaphore模拟CyclicBarrier的回环复用功能
package com.study.thread.conllection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest1 {
//创建一个用Static和volatile关键字修饰的信号量实例,传参为0,表示初始计数为0
private static volatile Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
int size = 3;
for (int i = 1; i <= size; i++) {
//将任务i添加至线程池 有释放许可release()
executorService.submit(() -> {
try {
//执行操作
System.out.println("A任务"+Thread.currentThread().getId()+"执行结束");
//释放一个许可,并将其返还给信号量,也就是将 permits+1
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
});
}
//(1) 等待子线程执行任务A完毕,返回,计数器又置为0了,现在相当于初始状态了,Semaphore可以复用
semaphore.acquire(size);
int count = 2;
for (int i = 1; i <= count; i++) {
//将任务i添加至线程池 有释放许可release()
executorService.submit(() -> {
try {
//执行操作
System.out.println("B任务"+Thread.currentThread().getId()+"执行结束");
//释放一个许可,并将其返还给信号量,也就是将 permits+1
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
});
}
//(2) 等待子线程执行B的子任务完毕,返回,并初始,可复用
semaphore.acquire(count);
System.out.println("所有任务执行完毕");
//关闭线程池
executorService.shutdown();
}
}
A任务11执行结束
A任务12执行结束
A任务11执行结束
B任务12执行结束
B任务12执行结束
所有任务执行完毕
CountDownLatch使用示例:
package com.study.thread.conllection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchSecKill {
//计数器大小为的CountDownLatch实例
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5); //一个最大线程数为5的线程池
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
try {
countDownLatch.await(); //让当前线程先阻塞
//开始执行每个线程的任务
System.out.println(Thread.currentThread().getName() + "号线程开始执行了");
} catch (Exception e) {
e.printStackTrace();
}
});
}
System.out.println("主线程开始执行");
//倒计时3秒
Thread.sleep(3000);
System.out.println("主线程执行3秒");
//线程执行发起指令
countDownLatch.countDown();
//关闭线程池,拒收新任务,如果此时等待队列还有未处理完的业务,继续FIFO处理等待队列中的旧任务
executorService.shutdown();
}
}
主线程开始执行
主线程执行3秒
pool-1-thread-4号线程开始执行了
pool-1-thread-5号线程开始执行了
pool-1-thread-1号线程开始执行了
pool-1-thread-2号线程开始执行了
pool-1-thread-3号线程开始执行了
CyclicBarrier屏障特点使用示例
package com.study.thread.conllection;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
//创建一个CyclicBarrier实例,添加一个所有子任务全部到达屏障后执行的任务 int parties 可以包含的子任务数量
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(
2, () -> System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "task1 merge result")
);
public static void main(String[] args) {
//创建一个线程数固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
//添加A任务中的子任务1
executorService.submit(() -> {
try {
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "task1-1");
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "enter in Barrier");
Thread.sleep(2000);
cyclicBarrier.await();
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "enter out Barrier");
} catch (Exception e) {
e.printStackTrace();
}
});
//添加A任务中的子任务2
executorService.submit(() -> {
try {
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "task1-2");
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "enter in Barrier");
Thread.sleep(1000);
cyclicBarrier.await();
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "enter out Barrier");
} catch (Exception e) {
e.printStackTrace();
}
});
//关闭线程池
executorService.shutdown();
}
}
2021-02-18 19:06:50.415 : Thread[pool-1-thread-1,5,main]task1-1
2021-02-18 19:06:50.427 : Thread[pool-1-thread-2,5,main]task1-2
2021-02-18 19:06:50.482 : Thread[pool-1-thread-1,5,main]enter in Barrier
2021-02-18 19:06:50.482 : Thread[pool-1-thread-2,5,main]enter in Barrier
2021-02-18 19:06:52.482 : Thread[pool-1-thread-1,5,main]task1 merge result
2021-02-18 19:06:52.482 : Thread[pool-1-thread-2,5,main]enter out Barrier
2021-02-18 19:06:52.482 : Thread[pool-1-thread-1,5,main]enter out Barrier
CyclicBarrier回环特点使用示例
package com.study.thread.conllection;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierHTest {
private static int parties = 2;
//创建一个CyclicBarrier实例,添加一个所有子任务全部到达屏障后执行的任务 int parties 可以包含的子任务数量
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(
parties, () -> System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "任务拦截,子任务达到" + parties + "后执行")
);
public static void main(String[] args) {
//创建一个线程数固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(parties);
//子任务1
executorService.submit(() -> {
try {
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "第一步");
cyclicBarrier.await();
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "第二步");
cyclicBarrier.await();
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "第三步");
} catch (Exception e) {
e.printStackTrace();
}
});
//子任务2
executorService.submit(() -> {
try {
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "第一步");
cyclicBarrier.await();
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "第二步");
cyclicBarrier.await();
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + " : " + Thread.currentThread() + "第三步");
} catch (Exception e) {
e.printStackTrace();
}
});
//关闭线程池
executorService.shutdown();
}
}
2021-02-18 19:14:43.249 : Thread[pool-1-thread-1,5,main]第一步
2021-02-18 19:14:43.257 : Thread[pool-1-thread-2,5,main]第一步
2021-02-18 19:14:43.321 : Thread[pool-1-thread-2,5,main]任务拦截,子任务达到2后执行
2021-02-18 19:14:43.322 : Thread[pool-1-thread-1,5,main]第二步
2021-02-18 19:14:43.322 : Thread[pool-1-thread-2,5,main]第二步
2021-02-18 19:14:43.323 : Thread[pool-1-thread-2,5,main]任务拦截,子任务达到2后执行
2021-02-18 19:14:43.323 : Thread[pool-1-thread-1,5,main]第三步
2021-02-18 19:14:43.323 : Thread[pool-1-thread-2,5,main]第三步
Phaser
1、party
通过phaser同步的线程被称为party (参与者). 所有需要同步的party必须持有同一个phaser对象.
party需要向phaser注册,执行phaser.register()方法注册,该方法仅仅是增加phaser中的线程计数.(不常用方式)
也可以通过构造器注册,比如new Phaser(3)就会在创建phaser对象时注册3个party. (常用方式)
这3个party只要持有该phaser对象并调用该对象的api就能实现同步.
2、unarrived
party到达一个phaser(阶段)之前处于unarrived状态
3、arrived
到达时处于arrived状态.一个arrived的party也被称为arrival
4、deregister
一个线程可以在arrive某个phase后退出(deregister),与参赛者中途退赛相同,可以使用arriveAndDeregister()方法来实现. (到达并注销)
5、phase计数
Phaser类有一个phase计数,初始阶段为0.当一个阶段的所有线程arrive时,会将phase计数加1,这个动作被称为advance.
当这个计数达到Integer.MAX_VALUE时,会被重置为0,开始下一轮循环
advace这个词出现在Phaser类的很多api里
比如arriveAndAwaitAdvance()、awaitAdvance(int phase)等.
在advance时,会触发onAdvance(int phase, int registeredParties)方法的执行.
6、onAdvance(int phase, int registeredParties)
可以在这个方法中定义advance过程中需要执行何种操作。
如果需要进入下一阶段(phase)执行,返回false.如果返回true,会导致phaser结束
因此该方法也是终止phaser的关键所在。
Phaser使用示例
package com.study.thread.conllection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
public class MyPharserTest {
public static final int SIZE = 5;
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(15);
MyPharser phaser = new MyPharser();
for (int i = 0; i < 3; i++) {
MyThread t = new MyThread(phaser, "线程" + i);
phaser.register();//注册一阶段的线程数+1
es.submit(t);
}
Thread.sleep(5000);
System.out.println("MyPharser state:" + phaser.isTerminated());
es.shutdown();
}
static class MyPharser extends Phaser {
public MyPharser() {
}
@Override
protected boolean onAdvance(int phase, int registeredParties) { //每个阶段完成之后会调用该方法
// phase从0开始
System.out.println("此时phase:" + phase + ";registeredParties:" + registeredParties + ";MyPharser state:" + this.isTerminated());
//如果已经到达了最后一个阶段,或者参与者为0,则返回true,代表结束phaser
if (SIZE == phase + 1 || registeredParties == 0) {
return true; //MyPharser的整个阶段完结,返回true
} else {
return false;
}
}
}
static class MyThread extends Thread {
private Phaser phaser;
private String name;
public MyThread(Phaser phaser, String name) {
this.phaser = phaser;
this.name = name;
}
@Override
public void run() {
for (int i = 1; i <= SIZE; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " : 执行完第" + i + "阶段任务。");
phaser.arriveAndAwaitAdvance();// 会调用Phaser类或者子类的onAdvance方法
}
}
}
}
线程0 : 执行完第1阶段任务。
线程2 : 执行完第1阶段任务。
线程1 : 执行完第1阶段任务。
此时phase:0;registeredParties:3;MyPharser state:false
线程0 : 执行完第2阶段任务。
线程2 : 执行完第2阶段任务。
线程1 : 执行完第2阶段任务。
此时phase:1;registeredParties:3;MyPharser state:false
线程2 : 执行完第3阶段任务。
线程0 : 执行完第3阶段任务。
线程1 : 执行完第3阶段任务。
此时phase:2;registeredParties:3;MyPharser state:false
线程0 : 执行完第4阶段任务。
线程2 : 执行完第4阶段任务。
线程1 : 执行完第4阶段任务。
此时phase:3;registeredParties:3;MyPharser state:false
线程1 : 执行完第5阶段任务。
线程0 : 执行完第5阶段任务。
线程2 : 执行完第5阶段任务。
此时phase:4;registeredParties:3;MyPharser state:false
MyPharser state:true
总结
Phaser可以在每个阶段增加新的参与者,只要register即可,也可以退出,只要deregister即可。
Phaser的灵活性比CyclicBarrier强大的多,所以称之为增强型CyclicBarrier。
phaser示例2
package com.study.thread.conllection;
import java.util.Random;
import java.util.concurrent.Phaser;
public class PharserTest {
static final Random random = new Random();
static class StaffTask {
public void step1Task() throws InterruptedException {
// 第一阶段:来公司集合
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "从家出发了……");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公司");
}
public void step2Task() throws InterruptedException {
// 第二阶段:出发去公园
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出发去公园玩");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公园门口集合");
}
public void step3Task() throws InterruptedException {
// 第三阶段:去餐厅
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出发去餐厅");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达餐厅");
}
public void step4Task() throws InterruptedException {
// 第四阶段:就餐
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "开始用餐");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "用餐结束,回家");
}
}
public static void main(String[] args) {
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// 参与者数量,去除主线程
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出发去公园,人数:" + staffs);
break;
case 1:
System.out.println("大家都到公司门口了,出发去餐厅,人数:" + staffs);
break;
case 2:
System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs);
break;
}
// 判断是否只剩下主线程(一个参与者),如果是,则返回true,代表终止
return registeredParties == 1;
}
};
// 注册主线程 ———— 让主线程全程参与
phaser.register();
final StaffTask staffTask = new StaffTask();
// 3个全程参与TB的员工
for (int i = 0; i < 3; i++) {
// 添加任务数
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
phaser.arriveAndAwaitAdvance();
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 两个不聚餐的员工加入
for (int i = 0; i < 2; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
System.out.println("员工【" + Thread.currentThread().getName() + "】回家了");
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
while (!phaser.isTerminated()) {
int phase = phaser.arriveAndAwaitAdvance();
if (phase == 2) {
// 到了去餐厅的阶段,又新增4人,参加晚上的聚餐
for (int i = 0; i < 4; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
}
}
员工【Thread-0】从家出发了……
员工【Thread-1】从家出发了……
员工【Thread-2】从家出发了……
员工【Thread-3】从家出发了……
员工【Thread-4】从家出发了……
员工【Thread-2】到达公司
员工【Thread-4】到达公司
员工【Thread-1】到达公司
员工【Thread-3】到达公司
员工【Thread-0】到达公司
大家都到公司了,出发去公园,人数:5
员工【Thread-2】出发去公园玩
员工【Thread-0】出发去公园玩
员工【Thread-1】出发去公园玩
员工【Thread-3】出发去公园玩
员工【Thread-4】出发去公园玩
员工【Thread-3】到达公园门口集合
员工【Thread-3】回家了
员工【Thread-4】到达公园门口集合
员工【Thread-4】回家了
员工【Thread-1】到达公园门口集合
员工【Thread-0】到达公园门口集合
员工【Thread-2】到达公园门口集合
大家都到公司门口了,出发去餐厅,人数:3
员工【Thread-0】出发去餐厅
员工【Thread-2】出发去餐厅
员工【Thread-1】出发去餐厅
员工【Thread-5】出发去餐厅
员工【Thread-6】出发去餐厅
员工【Thread-7】出发去餐厅
员工【Thread-8】出发去餐厅
员工【Thread-0】到达餐厅
员工【Thread-6】到达餐厅
员工【Thread-2】到达餐厅
员工【Thread-7】到达餐厅
员工【Thread-5】到达餐厅
员工【Thread-8】到达餐厅
员工【Thread-1】到达餐厅
大家都到餐厅了,开始用餐,人数:7
员工【Thread-2】开始用餐
员工【Thread-1】开始用餐
员工【Thread-0】开始用餐
员工【Thread-6】开始用餐
员工【Thread-7】开始用餐
员工【Thread-8】开始用餐
员工【Thread-5】开始用餐
员工【Thread-1】用餐结束,回家
员工【Thread-6】用餐结束,回家
员工【Thread-0】用餐结束,回家
员工【Thread-2】用餐结束,回家
员工【Thread-5】用餐结束,回家
员工【Thread-8】用餐结束,回家
员工【Thread-7】用餐结束,回家
网友评论