美文网首页
FutureTask配合ConcurrentHashMap实现任

FutureTask配合ConcurrentHashMap实现任

作者: 刘文琼 | 来源:发表于2020-04-27 00:18 被阅读0次

案例需求

项目开发中,有些查询在相同条件下每次查询结果都一样,此时可以将结果缓存起来以便下次查询时直接返回。这样能大大提升查询效率,特别是耗时很长的查询,效果更明显。在这个需求中,我们除了要求将结果进行缓存外,还要确保查询任务只执行一次。
在这篇文章中,我们使用FutureTaskConcurrentHashMap来实现上面的需求。

代码实现

我们定义一个UserService的类,提供一个根据用户ID查询对应用户的方法。假设这个查询耗时很长(虽然一般情况下并不需要多长时间),而且相同ID的查询结果都一样。这时就可以考虑将结果缓存起来。
具体代码如下:

  • 定义一个User实体
public class User {
    private Integer id;
    private String name;

    public User(Integer id, String name) {
        this.id = id;
        this.name = name;
    }

    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
  • UserService服务类
public class UserService {

    public User getOne(Integer id) {
        // 其他耗时很长的操作
        return new User(id,"user");
    }
}
  • Client场景类
public class Client {

    private Map<Integer, User> cache = new HashMap<>();
    private UserService service = new UserService();

    public static void main(String[] args) {
        Client client = new Client();
        int id = 1234;
        // 启动五个线程并发执行
        for (int i = 0; i < 5; i++) {
            new Thread(() -> System.out.println(client.getUserById(id))).start();
        }
    }

    private synchronized User getUserById(Integer id) {
        User user = cache.get(id);
        if (user == null) {
            user = service.getOne(id);
            cache.put(id, user);
        }

        return user;
    }

}

执行结果:

D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@6706a70b
User@6706a70b
User@6706a70b
User@6706a70b
User@6706a70b

Process finished with exit code 0

通过执行结果可以看出,上述场景类可以完成我们的需求。但是这种实现方式存在不足之处。由于HashMap不是线程安全的,所以我们在getUserById方法上使用synchronized同步锁来保证线程安全。但是这样的话会造成其他线程阻塞,大大降低了查询效率。那这时候该怎么办呢,可能有人会说既然HashMap不是线程安全的,那只要使用线程安全的ConcurrentHashMap替换HashMap不就行了吗。话不多说,我们上代码一试便知。

  • Client场景类(使用ConcurrentHashMap
    改动的地方只是使用ConcurrentHashMap替换了HashMap,并去掉了getUserById方法上的synchronized关键字
public class Client {

    private Map<Integer, User> cache = new ConcurrentHashMap<>();
    private UserService service = new UserService();

    public static void main(String[] args) {
        Client client = new Client();
        int id = 1234;
        // 启动五个线程并发执行
        for (int i = 0; i < 5; i++) {
            new Thread(() -> System.out.println(client.getUserById(id))).start();
        }
    }

    private User getUserById(Integer id) {
        User user = cache.get(id);
        if (user == null) {
            user = service.getOne(id);
            cache.put(id, user);
        }

        return user;
    }

}

接下来看看执行结果:

D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@693e4988
User@6223c513
User@278e4dc4
User@346827f
User@76ee89dd

Process finished with exit code 0

结果好像跟我们预期的不一样,每一次调用都返回了新的对象,缓存并没有生效。为什么会这样,其实很简单。ConcurrentHashMap虽然是线程安全的,但是由于我们去掉了getUserById方法上的synchronized关键字,并发访问时就会出现多个线程进入到if (user == null)判断,从而重复执行查询操作。那如何解决这个问题呢。这就是我们本次的主题:实现任务唯一执行
我们可以想一下,之所以出现重复执行的问题,是因为第一个查询任务执行的时候,后面的线程并不知道这回事,所以就自己执行查询操作。如果能做到让其他线程知道已有任务在执行中,然后让他们阻塞等待任务执行完成,再去获取结果即可,这样就能实现任务唯一执行了。JUC包中已经给我们提供了一个这样类:FutureTask。它表示一个异步计算,并且只有计算完成才能通过get方法获取结果,否则会一直阻塞或者抛出TimeoutException(如果get时设置了超时时间)。下面来看看具体实现方式

  • Client场景类(ConcurrentHashMap配合FutureTask使用)
public class Client {

    private Map<Integer, Future<User>> cache = new ConcurrentHashMap<>(); // 1
    private UserService service = new UserService();

    public static void main(String[] args) {
        Client client = new Client();
        int id = 1234;
        // 启动五个线程并发执行
        for (int i = 0; i < 5; i++) {
            new Thread(() -> System.out.println(client.getUserById(id))).start();
        }
    }

    // 使用FutureTask的特性,如果已有查询任务在执行,其他线程可以
    // 获取这个任务,并等待其返回结果
    private User getUserById(Integer id) {
        Future<User> future = cache.get(id);
        if (future == null) {
            FutureTask task = new FutureTask<>(() -> service.getOne(id));
            future = cache.putIfAbsent(id, task); // 2
            if (future == null) {
                future = task;
                task.run();
            }
        }

        try {
            return future.get();
        } catch (Exception e) {
            cache.remove(id, future); // 3
            throw new RuntimeException(e.getMessage(), e);
        }
    }

}

执行结果:

D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d

Process finished with exit code 0

结果符合我们的需求,接下来一步步分析实现原理,关键实现主要有三个地方。
首先,相比于上一个版本。ConcurrentHashMap中的valueUser变为Future<User>FutureTaskFuture接口的一个实现类):private Map<Integer, **Future<User>**> cache = new ConcurrentHashMap<>();这样做的目的是将任务缓存起来,告诉其他线程我正在执行任务,你乖乖地等着我执行完成获取结果就好,不需要重复执行相同的任务了。
其次,在getUserById方法中,当缓存中还没有任务时,会创建一个FutureTask任务,并将它放入缓存中。注意这里创建任务时可能多个线程都完成了创建任务的工作,但是放进缓存的时候只能有一个线程成功。这得益于ConcurrentHashMap的线程安全机制,以及future = cache.putIfAbsent(id, task);putIfAbsent方法的使用。这个方法只有在当前key没有对应value时才进行写入,并且返回null。反之,则不会进行写入操作,而是直接返回key对应的值。这就保证了在key相同的情况下只会有一个线程创建的任务能放进缓存中。
最后,需要考虑缓存污染的问题。假如任务执行失败了,那么其他线程通过缓存获取的FutureTask就无法获取到正确的结果。所以我们需要在get发生异常时移除对应的缓存数据。
以上是个人对ConcurrentHashMapFutureTask使用的一些理解,如果哪里写得有问题,还望各位猿友斧正。

相关文章

网友评论

      本文标题:FutureTask配合ConcurrentHashMap实现任

      本文链接:https://www.haomeiwen.com/subject/gmttwhtx.html