前言
前几天看面试题的时候,偶然在知乎看到了一道某家大型互联网公司出的面试题 “一主多从 多线程协作”,题目是这样的
问:客户请求下单服务(OrderService),服务端会验证用户的身份(RemotePassportService),用户的银行信用(RemoteBankService),用户的贷款记录(RemoteLoanService)。为提高并发效率,要求三项服务验证工作同时进行,如其中任意一项验证失败,则立即返回失败,否则等待所有验证结束,成功返回。要求Java实现
由于 CountDownLatch 内部是使用 AQS 同步器实现的,我就想到了能不能用 synchronized 内置锁,来模拟实现一下闭锁的功能,经过了一天的思考,使用 wait 和 notify 做了一个简单的例子.......
什么是 CountDownLatch ?
CountDownLatch 是一个非常实用的多线程控制工具类。在 JDK1.5 之后被引入,这个工具通常用来控制线程等待,它可以让一个线程或多个线程等待其它线程完成各自的操作以后,再继续执行。
在这里有关于 CountDownLatch 的使用方式和应用场景就不再描述了,话不多说,下面是具体的实现代码。
【我只是做了一个简单的实现, 至于性能和安全性方面, 还希望大神们多多指导】
模拟实现 CountDownLatch
首先定义一个基本的功能接口:
interface CountDownLatchSimulate {
int getCount();
void countDown();
void await() throws InterruptedException;
int threadCount();
}
使用 Synchronized 实现的工具类如下:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
/**
* @author Lin Shu
* Please contact me if you have any questions.
* My e-mail is syt0438@gmail.com
* Date: 2018/3/22
* Time: 11:37
*/
public class CountDownLatchSynchronized implements CountDownLatchSimulate {
private static final Logger LOGGER = Logger.getGlobal();
private final Object lock = new Object();
/**
* 通过 state 实现类似 CountDownLatch 的state,
* 多个任务等待 state 为 0 时,恢复执行
*/
private final AtomicInteger state;
/**
* 记录当前闭锁是否已经关闭,防止出现所有子线程 notify 已经发生,而 await 还没有发生的现象.
*/
private final AtomicBoolean lockClosed = new AtomicBoolean(false);
/**
* 通过 Map 保证 await 的线程,一定是在其它线程执行完之后唤醒
*/
private volatile ConcurrentHashMap<String, Thread> threads = new ConcurrentHashMap<>();
public CountDownLatchSynchronized(int state) {
this.state = new AtomicInteger(state);
}
@Override
public int getCount() {
return state.get();
}
/**
* 1. 通过 state 记录闭锁状态
* 2. 通过一个 Map 来保证 await 线程始终在其它countDown线程执行完成之后恢复执行
* 3. 当所有其它线程都已经执行完成并且调用了 notify 方法之后,由于 await 线程
* 还未执行到 await 方法未阻塞,从而错过了 notify 通知,因此会一直阻塞下去
* 通过 lockClosed 来保存闭锁当前的状态,来解决丢失通知的问题
*/
@Override
public void countDown() {
synchronized (lock) {
final int val = state.decrementAndGet();
if (val == 0) {
/*
* 当闭锁状态等于 0 时,闭锁达到临界值,通知所有线程恢复执行
* */
try {
LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Notify All : " + Thread.currentThread());
lock.notifyAll();
LOGGER.info("val: " + val + " : threadCount: " + threadCount());
} finally {
/*
* 保存闭锁的关闭状态
* */
lockClosed.compareAndSet(false, true);
LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Count Down Closed: " + Thread.currentThread());
}
} else {
/*
* 当闭锁状态大于 0 时,当前线程进入等待状态,
* 并将当前线程保存到 Map 当中,以保证 await 线程的执行顺序
* */
try {
threads.put(Thread.currentThread().getName(), Thread.currentThread());
LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Put Thread: " + Thread.currentThread());
lock.wait();
} catch (InterruptedException e) {
Logger.getGlobal().warning("[" + threadCount() + "|" + getCount() + "]Count Down Thread Interupted: " + Thread.currentThread());
} finally {
threads.remove(Thread.currentThread().getName(), Thread.currentThread());
LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Remove Thread: " + Thread.currentThread());
if (threads.isEmpty()) {
lock.notifyAll();
}
LOGGER.info("val: " + val + " : threadCount: " + threadCount() + "|" + getCount());
}
}
}
}
@Override
public void await() throws InterruptedException {
synchronized (lock) {
if (!lockClosed.get()) {
/*
* 如果闭锁状态 > 0 并且 lockClosed 为假,未到达关闭状态,则当前线程进入等待状态
* */
lock.wait();
} else {
/*
* 如果闭锁状态 == 0 并且 lockClosed 为假,到达关闭状态,当前线程不等待,继续执行
* */
LOGGER.info("[" + System.currentTimeMillis() + "|" + getCount() + "] Count Down Latch is Closed become to Skip wait [" + Thread.currentThread().getName() + "]让出CPU执行权!");
}
while (!threads.isEmpty()) {
/*
* 如果其它线程还没有执行完成,则当前线程进入等待状态,让出 CUP 的执行权
* */
LOGGER.info("[" + System.currentTimeMillis() + "|" + getCount() + "]Await线程[" + Thread.currentThread().getName() + "]让出CPU执行权!");
if (!lockClosed.get()) {
lock.wait();
} else {
break;
}
}
LOGGER.info("[" + System.currentTimeMillis() + "|" + getCount() + "]Await线程[" + Thread.currentThread().getName() + "]退出等待状态!");
}
}
@Override
public int threadCount() {
return threads.size();
}
}
具体的代码之后,会上传到我的 github 上去,【我是传送门,点击我】
总结
通过本文的介绍,希望大家能够了解 CountDownLatch 的应用场景和工作机制。
第一次写技术文章,文笔和写作逻辑方面难免有一些问题的存在,还望各位同仁多多指教。
网友评论