本文是Java线程安全和并发编程知识总结的一部分。
2.3 闭锁CountDownLatch
闭锁实际上不是锁,而是是一种特殊的同步工具。它将一个或多个线程被阻塞,直到另外一组线程中的某个操作都发生了为止。
CountDownLatch 提供了如下方法:
- 初始化器:以指定计数来创建一个闭锁。
- await(): 让线程运行到该方法被调用的地方时挂起,等候闭锁计数器倒数到0后被唤醒。
- countDown(): 让闭锁计数器倒数。
闭锁适用的是这样一种场景:所有线程都在运行到 await 被调用的点时休眠,等到闭锁计数器倒数到0时,一起被唤醒继续执行。是不是非常像“一群人约定一起做一件事情,在约定的地方集合,等人数到期后一起出发”?
下面给出一个闭锁的使用例子:
/**
* @author xx
* 2020年2月5日 下午9:48:23
*/
public class Sample14 {
/**
* 2020年2月5日 下午9:48:58 xx添加此方法
* @param args
*/
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(10);
Sample14 sample = new Sample14();
// 模拟10个业务任务
for ( int i = 0; i <10; i++) {
final String taskName = "任务" + i;
new Thread(() -> {
// 集合
sample.rendezvous(taskName);
// 阻塞,等待其他人到达集合地
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("线程中断异常", e);
}
// 出发去干活
sample.doJob(taskName);
}).start();
}
}
/**
* 集合
* 2020年2月5日 下午10:00:25 xx添加此方法
*/
public void rendezvous(String name) {
Instant now = Instant.now();
System.out.println(name + " 出发去集合地:" + now.getEpochSecond() + ", " + now.getNano());
// 模拟集合
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}
now = Instant.now();
System.out.println(name + " 到达集合地:" + now.getEpochSecond() + ", " + now.getNano());
}
/**
* 集合后的工作
* 2020年2月5日 下午10:08:04 xx添加此方法
*/
public void doJob(String name) {
Instant now = Instant.now();
System.out.println(name + "开始执行自己的任务:" + now.getEpochSecond() + ", " + now.getNano());
// 模拟做一些业务
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
now = Instant.now();
System.out.println(name + " 已完成自己的任务:" + now.getEpochSecond() + ", " + now.getNano());
}
}
从日志可以看到,每个线程出发去集合地的时间都一样,到达集合地的时间不一样,但每个线程开始执行自己任务的时间都一样,都是最晚一个线程到达集合地的时间。
疑似栅锁的Future、FutureTask
Future接口的语义,是当计算结果尚未出来是,阻塞调用get()
方法的线程,直到结果计算出来可用,或被中断为止。
FutureTask类则实现了该接口和Runnable和Callable接口,因此可以用于多线程计算,并阻塞调用get()
的线程直到得到结果。
这和栅锁的效果有点类似,有点像专用于计算结果的栅锁:
/**
* @author xx
* 2020年2月6日 上午9:15:01
*/
public class SampleCache<T> {
/**
* 放置缓存数据的Map
*/
private final Map<String, T> cache;
/**
* 标记是否已完成初始化的标记
*/
private volatile boolean inited = false;
public SampleCache() {
this.cache = new HashMap<>();
}
/**
* 初始化缓存数据。将阻塞直到结果查出为止。
* 2020年2月6日 上午9:43:33 xx添加此方法
*/
public synchronized void init() {
if (!this.inited) {
final FutureTask<Map<String, T>> future = new FutureTask<Map<String, T>>(new Callable<Map<String, T>>() {
@Override
public Map<String, T> call() throws Exception {
return SampleCache.this.loadFromDb();
}
});
// 在独立线程中初始化,避免影响构造函数
new Thread(future).start();
try {
this.cache.putAll(future.get());
this.inited = true;
} catch (InterruptedException e) {
throw new RuntimeException("从数据库加载初始化数据时线程中断异常", e);
} catch (ExecutionException e) {
throw new RuntimeException("从数据库加载初始化数据时异常", e);
}
}
}
/**
* 从数据库加载所有数据
* 2020年2月6日 上午9:21:24 xx添加此方法
* @return
*/
private Map<String, T> loadFromDb() {
// 模拟数据加载逻辑
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException("从数据库加载初始化数据时线程中断异常", e);
}
return new HashMap<String, T>();
}
public T fetch(String key) {
// 初次获取缓存时初始化;实际上采用了DDC。在inited 属性有 volatile 修饰符的情况下,并无性能问题。
// 真实操作中,不要这么做;在生产环境中,一般都有其他机制,可以在缓存服务类被构建后,就启动初始化进程。
if (!this.inited) {
this.init();
}
return this.cache.get(key);
}
}
网友评论