案例需求
项目开发中,有些查询在相同条件下每次查询结果都一样,此时可以将结果缓存起来以便下次查询时直接返回。这样能大大提升查询效率,特别是耗时很长的查询,效果更明显。在这个需求中,我们除了要求将结果进行缓存外,还要确保查询任务只执行一次。
在这篇文章中,我们使用FutureTask
和ConcurrentHashMap
来实现上面的需求。
代码实现
我们定义一个UserService
的类,提供一个根据用户ID查询对应用户的方法。假设这个查询耗时很长(虽然一般情况下并不需要多长时间),而且相同ID的查询结果都一样。这时就可以考虑将结果缓存起来。
具体代码如下:
- 定义一个
User
实体
public class User {
private Integer id;
private String name;
public User(Integer id, String name) {
this.id = id;
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
-
UserService
服务类
public class UserService {
public User getOne(Integer id) {
// 其他耗时很长的操作
return new User(id,"user");
}
}
-
Client
场景类
public class Client {
private Map<Integer, User> cache = new HashMap<>();
private UserService service = new UserService();
public static void main(String[] args) {
Client client = new Client();
int id = 1234;
// 启动五个线程并发执行
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println(client.getUserById(id))).start();
}
}
private synchronized User getUserById(Integer id) {
User user = cache.get(id);
if (user == null) {
user = service.getOne(id);
cache.put(id, user);
}
return user;
}
}
执行结果:
D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@6706a70b
User@6706a70b
User@6706a70b
User@6706a70b
User@6706a70b
Process finished with exit code 0
通过执行结果可以看出,上述场景类可以完成我们的需求。但是这种实现方式存在不足之处。由于HashMap
不是线程安全的,所以我们在getUserById
方法上使用synchronized
同步锁来保证线程安全。但是这样的话会造成其他线程阻塞,大大降低了查询效率。那这时候该怎么办呢,可能有人会说既然HashMap
不是线程安全的,那只要使用线程安全的ConcurrentHashMap
替换HashMap
不就行了吗。话不多说,我们上代码一试便知。
-
Client
场景类(使用ConcurrentHashMap
)
改动的地方只是使用ConcurrentHashMap
替换了HashMa
p,并去掉了getUserById
方法上的synchronized
关键字
public class Client {
private Map<Integer, User> cache = new ConcurrentHashMap<>();
private UserService service = new UserService();
public static void main(String[] args) {
Client client = new Client();
int id = 1234;
// 启动五个线程并发执行
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println(client.getUserById(id))).start();
}
}
private User getUserById(Integer id) {
User user = cache.get(id);
if (user == null) {
user = service.getOne(id);
cache.put(id, user);
}
return user;
}
}
接下来看看执行结果:
D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@693e4988
User@6223c513
User@278e4dc4
User@346827f
User@76ee89dd
Process finished with exit code 0
结果好像跟我们预期的不一样,每一次调用都返回了新的对象,缓存并没有生效。为什么会这样,其实很简单。ConcurrentHashMap
虽然是线程安全的,但是由于我们去掉了getUserById
方法上的synchronized
关键字,并发访问时就会出现多个线程进入到if (user == null)
判断,从而重复执行查询操作。那如何解决这个问题呢。这就是我们本次的主题:实现任务唯一执行。
我们可以想一下,之所以出现重复执行的问题,是因为第一个查询任务执行的时候,后面的线程并不知道这回事,所以就自己执行查询操作。如果能做到让其他线程知道已有任务在执行中,然后让他们阻塞等待任务执行完成,再去获取结果即可,这样就能实现任务唯一执行了。JUC包中已经给我们提供了一个这样类:FutureTask
。它表示一个异步计算,并且只有计算完成才能通过get
方法获取结果,否则会一直阻塞或者抛出TimeoutException
(如果get时设置了超时时间)。下面来看看具体实现方式
-
Client
场景类(ConcurrentHashMap
配合FutureTask
使用)
public class Client {
private Map<Integer, Future<User>> cache = new ConcurrentHashMap<>(); // 1
private UserService service = new UserService();
public static void main(String[] args) {
Client client = new Client();
int id = 1234;
// 启动五个线程并发执行
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println(client.getUserById(id))).start();
}
}
// 使用FutureTask的特性,如果已有查询任务在执行,其他线程可以
// 获取这个任务,并等待其返回结果
private User getUserById(Integer id) {
Future<User> future = cache.get(id);
if (future == null) {
FutureTask task = new FutureTask<>(() -> service.getOne(id));
future = cache.putIfAbsent(id, task); // 2
if (future == null) {
future = task;
task.run();
}
}
try {
return future.get();
} catch (Exception e) {
cache.remove(id, future); // 3
throw new RuntimeException(e.getMessage(), e);
}
}
}
执行结果:
D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
Process finished with exit code 0
结果符合我们的需求,接下来一步步分析实现原理,关键实现主要有三个地方。
首先,相比于上一个版本。ConcurrentHashMap
中的value
由User
变为Future<User>
(FutureTask
是Future
接口的一个实现类):private Map<Integer, **Future<User>**> cache = new ConcurrentHashMap<>();
这样做的目的是将任务缓存起来,告诉其他线程我正在执行任务,你乖乖地等着我执行完成获取结果就好,不需要重复执行相同的任务了。
其次,在getUserById
方法中,当缓存中还没有任务时,会创建一个FutureTask
任务,并将它放入缓存中。注意这里创建任务时可能多个线程都完成了创建任务的工作,但是放进缓存的时候只能有一个线程成功。这得益于ConcurrentHashMap
的线程安全机制,以及future = cache.putIfAbsent(id, task);
中putIfAbsent
方法的使用。这个方法只有在当前key
没有对应value
时才进行写入,并且返回null。反之,则不会进行写入操作,而是直接返回key
对应的值。这就保证了在key
相同的情况下只会有一个线程创建的任务能放进缓存中。
最后,需要考虑缓存污染的问题。假如任务执行失败了,那么其他线程通过缓存获取的FutureTask就无法获取到正确的结果。所以我们需要在get
发生异常时移除对应的缓存数据。
以上是个人对ConcurrentHashMap
和FutureTask
使用的一些理解,如果哪里写得有问题,还望各位猿友斧正。
网友评论