疫情逐渐好转,部门也有半年多没有TB团建了,并且金三银四,部门又招了一波新人;
leader让你组织一次TB:周六上午,大家先到公司集合,然后一起去朝阳公园玩,最后一起去餐厅聚餐,然后回家。
为了体现团队集体意识,在每次开启新项目时,需要所有人一起开始行动(不能早来的人都把东西吃光了吧~),并且每个阶段活动完成后,需要统计人数、向上汇报。
这个场景,如何借助JUC并发工具来实现呢?
我们先来梳理一下,任务特点:
- 很显然,
每次开启新项目时,需要所有人一起开始行动
这是个多任务相互等待,直到所有人都到达一个点时,才开始执行; - 同时,TB活动是
分为多个阶段
的,每个阶段都有具体要做的事; -
每个阶段完后
,组织者还得做点而外的事
; -
参与者的数量,是确定的
;
看到多任务相互等待,相信很多人已经想到了 CyclicBarrier。
没错,这个TB任务的特点,其实也是使用 CyclicBarrier 时的特点。
下面来看 如何使用 CyclicBarrier 实现TB成员管理的。
先来看看 CyclicBarrier 的源码注释;
A synchronizati on aid that allows a set of threads to all wait for each other to reach a common barrier point.
描述如下:多个线程相互等待,直到所有线程到达同一个同步点,再继续一起执行。
CyclicBarrier适用于多个线程有固定的多步需要执行,线程间互相等待,当都执行完了,在一起执行下一步。
CyclicBarrier 字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
- 叫做回环,是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
- 叫做栅栏,大概是描述所有线程被栅栏挡住了,当都达到时,一起跳过栅栏执行,也算形象。我们可以把这个状态就叫做barrier。
CyclicBarrier 的API
public CyclicBarrier(int parties)
public int await()
构造函数,指定参与者数量;
await()让线程阻塞在栅栏。
CyclicBarrier实现TB成员协同
我们用 parties 变量指定了参与者数量;用sleep(随机数)来模拟每个TB活动不同成员的耗时;代码实现如下:
static final Random random = new Random();
static final CyclicBarrier cyclicBarrier = new CyclicBarrier(parties);
static class StaffThread extends Thread {
@Override
public void run() {
try {
String staff = "员工【" + Thread.currentThread().getName() + "】";
// 第一阶段:来公司集合
System.out.println(staff + "从家出发了……");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公司");
// 协同,第一次等大家到齐
cyclicBarrier.await();
// 第二阶段:出发去公园
System.out.println(staff + "出发去公园玩");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公园门口集合");
// 协同:第二次等大家到齐
cyclicBarrier.await();
// 第三阶段:去餐厅
System.out.println(staff + "出发去餐厅");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达餐厅");
// 协同:第三次等大家到齐
cyclicBarrier.await();
// 第四阶段:就餐
System.out.println(staff + "开始用餐");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "用餐结束,回家");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 所有人,开始行动
for (int i = 0; i < parties; i++) {
new StaffThread().start();
}
}
我们用 StaffThread 代表每个员工参加TB活动要做的事;每个成员在各个阶段,花费的时间可能不同;
每个员工,在执行完成当前阶段后 cyclicBarrier.await()进行阻塞(任务协同),等待大家到齐了再进入下一阶段。
值得一提的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值,自动复原。这个功能用起来实在是太方便了。
由于 CyclicBarrier 的可重用特性,当所有等待线程都被释放以后,CyclicBarrier可以被重用;
因此只要每个阶段,所有成员都完成后,CyclicBarrier就会自动重用,以此往复。
看上去,很完美。就差一点了 —— TB组织者,要在每个阶段结束后向上汇报;
这就用到了 CyclicBarrier 的回调函数功能,CyclicBarrier 的第二个构造方法:
CyclicBarrier(int parties, Runnable barrierAction);
barrierAction 可以指定一个善后处理的task,在所有人都到达屏障点时,来执行;
就好比团建时,所有人都到达公园门口了,这时组织者喊“都别走,先拍个照”,然后横幅一拉……
(呜呜呜……)
下面来看 如何实现;
由于构造函数中,只能指定一个Runnable善后任务,但我们的TB活动有多个阶段,每个阶段都需要汇报一次,因此我们实现的Runnable任务,需要判断在不同的阶段,做不同的汇报;
我们用 peroid 变量代表当前阶段,初始值为1;
CyclicBarrier的可复用功能,在所有人都达到集合点后,执行一次 milestoneRunnable 善后任务,意味着 milestoneRunnable 执行一次后,就代表进入下一阶段,因此peroid++;
//阶段
static int peroid = 1;
/**
* 里程碑
* 每阶段完成后,会执行这里;
*/
static Runnable milestoneRunnable = new Runnable() {
@Override
public void run() {
switch (peroid) {
case 1:
System.out.println("********第1阶段***************");
break;
case 2:
System.out.println("********第2阶段***************");
break;
case 3:
System.out.println("********第3阶段***************");
break;
}
peroid++;
}
};
我们换用带回调函数的构造方法,再执行
static final CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, milestoneRunnable);
public static void main(String[] args) {
// 所有人,开始行动
for (int i = 0; i < parties; i++) {
new StaffThread().start();
}
}
运行结果:
员工【Thread-0】从家出发了……
员工【Thread-2】从家出发了……
员工【Thread-1】从家出发了……
员工【Thread-2】到达公司
员工【Thread-0】到达公司
员工【Thread-1】到达公司
********第1阶段***************
员工【Thread-1】出发去公园玩
员工【Thread-2】出发去公园玩
员工【Thread-0】出发去公园玩
员工【Thread-2】到达公园门口集合
员工【Thread-1】到达公园门口集合
员工【Thread-0】到达公园门口集合
********第2阶段***************
员工【Thread-0】出发去餐厅
员工【Thread-2】出发去餐厅
员工【Thread-1】出发去餐厅
员工【Thread-0】到达餐厅
员工【Thread-1】到达餐厅
员工【Thread-2】到达餐厅
********第3阶段***************
员工【Thread-2】开始用餐
员工【Thread-0】开始用餐
员工【Thread-1】开始用餐
员工【Thread-2】用餐结束,回家
员工【Thread-0】用餐结束,回家
员工【Thread-1】用餐结束,回家
通过这个例子,对 CyclicBarrier 的基本使用,是不是清晰了很多。
关于CyclicBarrier的回调函数
CyclicBarrier 的回调函数,可以指定一个线程池来运行,相当于异步完成;
如果不指定线程池,默认在最后一个执行await()的线程执行,相当于同步完成。
这有什么区别呢?
实则关注性能的场景,区别很大。
CountDownLatch和CyclicBarrier让多线程步调一致 文中的例子,对账系统每天会校验是否存在异常订单,简易实现如下:
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
文中有使用CyclicBarrier的实现:查询订单和查询派单 两个线程相互等待,都完成时执行回调:进行check对账
对账任务可以大致分为两步,第一步查询需要核对的订单、账单;第二步执行check()、记录差异。
两个查询操作可以并发完成,第二步执行check()、记录差异
可以看作是第一阶段任务完成后的“结果汇总”,可以使用CyclicBarrier的回调函数来完成;
最终达到的效果图:
也就是并发去查询,查询的结果让 回调函数异步执行,好处是查询线程直接进入下一阶段、继续查询下一组数据;
中间使用一个同步容器,保存订单数据即可。详细思路&代码,见:CountDownLatch和CyclicBarrier让多线程步调一致
试想,假设让CyclicBarrier的回调函数执行在一个回合里最后执行await()的线程上,而且同步调用回调函数check(),调用完check之后,才会开始第二回合。
所以check如果不另开一线程异步执行,就起不到性能优化的作用了。
CyclicBarrier 的回调函数究竟是哪个线程执行的呢?
如果你分析源码,你会发现执行回调函数的线程是将 CyclicBarrier 内部计数器减到 0 的那个线程。
这里强调一下:当看到回调函数的时候,一定问一问执行回调函数的线程是谁。
如果CyclicBarrier回调函数不使用隔离的线程池,则CyclicBarrier最后一个线程忙着执行回调,其他线程还在阻塞,可能适得其反。
问题思考:需求升级
需求升级了:实际TB活动中,可能有人白天有事,不能参加公园的活动、但晚上会来聚餐;有人白天能参加,晚上不能参加;
并且公园的门票,聚餐费用,因参与人数不同,又有不同。
思考:需求升级后,如何实现?CyclicBarrier 能完成吗?
~~下回揭晓
推荐阅读
-
JUC源码
本文首发于 公众号 架构道与术(ToBeArchitecturer),欢迎关注、学习更多干货~
推荐阅读
-
并发设计模式
并发设计模式 | Worker Thread模式:如何避免重复创建线程?
并发设计模式 | Thread-Per-Message每请求每线程
-
并发工具
25 | CompletionService:批量执行异步任务
24 | CompletableFuture:Java异步编程
19 | CountDownLatch和CyclicBarrier让多线程步调一致
17 | ReadWriteLock:如何快速实现一个完备的缓存?
-
并发基础
网友评论