美文网首页
FutureTask配合ConcurrentHashMap实现任

FutureTask配合ConcurrentHashMap实现任

作者: 刘文琼 | 来源:发表于2020-04-27 00:18 被阅读0次

    案例需求

    项目开发中,有些查询在相同条件下每次查询结果都一样,此时可以将结果缓存起来以便下次查询时直接返回。这样能大大提升查询效率,特别是耗时很长的查询,效果更明显。在这个需求中,我们除了要求将结果进行缓存外,还要确保查询任务只执行一次。
    在这篇文章中,我们使用FutureTaskConcurrentHashMap来实现上面的需求。

    代码实现

    我们定义一个UserService的类,提供一个根据用户ID查询对应用户的方法。假设这个查询耗时很长(虽然一般情况下并不需要多长时间),而且相同ID的查询结果都一样。这时就可以考虑将结果缓存起来。
    具体代码如下:

    • 定义一个User实体
    public class User {
        private Integer id;
        private String name;
    
        public User(Integer id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public Integer getId() {
            return id;
        }
        public void setId(Integer id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }
    
    • UserService服务类
    public class UserService {
    
        public User getOne(Integer id) {
            // 其他耗时很长的操作
            return new User(id,"user");
        }
    }
    
    • Client场景类
    public class Client {
    
        private Map<Integer, User> cache = new HashMap<>();
        private UserService service = new UserService();
    
        public static void main(String[] args) {
            Client client = new Client();
            int id = 1234;
            // 启动五个线程并发执行
            for (int i = 0; i < 5; i++) {
                new Thread(() -> System.out.println(client.getUserById(id))).start();
            }
        }
    
        private synchronized User getUserById(Integer id) {
            User user = cache.get(id);
            if (user == null) {
                user = service.getOne(id);
                cache.put(id, user);
            }
    
            return user;
        }
    
    }
    

    执行结果:

    D:\javasoft\jdk1.8.0_231\bin\java.exe "...
    User@6706a70b
    User@6706a70b
    User@6706a70b
    User@6706a70b
    User@6706a70b
    
    Process finished with exit code 0
    

    通过执行结果可以看出,上述场景类可以完成我们的需求。但是这种实现方式存在不足之处。由于HashMap不是线程安全的,所以我们在getUserById方法上使用synchronized同步锁来保证线程安全。但是这样的话会造成其他线程阻塞,大大降低了查询效率。那这时候该怎么办呢,可能有人会说既然HashMap不是线程安全的,那只要使用线程安全的ConcurrentHashMap替换HashMap不就行了吗。话不多说,我们上代码一试便知。

    • Client场景类(使用ConcurrentHashMap
      改动的地方只是使用ConcurrentHashMap替换了HashMap,并去掉了getUserById方法上的synchronized关键字
    public class Client {
    
        private Map<Integer, User> cache = new ConcurrentHashMap<>();
        private UserService service = new UserService();
    
        public static void main(String[] args) {
            Client client = new Client();
            int id = 1234;
            // 启动五个线程并发执行
            for (int i = 0; i < 5; i++) {
                new Thread(() -> System.out.println(client.getUserById(id))).start();
            }
        }
    
        private User getUserById(Integer id) {
            User user = cache.get(id);
            if (user == null) {
                user = service.getOne(id);
                cache.put(id, user);
            }
    
            return user;
        }
    
    }
    

    接下来看看执行结果:

    D:\javasoft\jdk1.8.0_231\bin\java.exe "...
    User@693e4988
    User@6223c513
    User@278e4dc4
    User@346827f
    User@76ee89dd
    
    Process finished with exit code 0
    

    结果好像跟我们预期的不一样,每一次调用都返回了新的对象,缓存并没有生效。为什么会这样,其实很简单。ConcurrentHashMap虽然是线程安全的,但是由于我们去掉了getUserById方法上的synchronized关键字,并发访问时就会出现多个线程进入到if (user == null)判断,从而重复执行查询操作。那如何解决这个问题呢。这就是我们本次的主题:实现任务唯一执行
    我们可以想一下,之所以出现重复执行的问题,是因为第一个查询任务执行的时候,后面的线程并不知道这回事,所以就自己执行查询操作。如果能做到让其他线程知道已有任务在执行中,然后让他们阻塞等待任务执行完成,再去获取结果即可,这样就能实现任务唯一执行了。JUC包中已经给我们提供了一个这样类:FutureTask。它表示一个异步计算,并且只有计算完成才能通过get方法获取结果,否则会一直阻塞或者抛出TimeoutException(如果get时设置了超时时间)。下面来看看具体实现方式

    • Client场景类(ConcurrentHashMap配合FutureTask使用)
    public class Client {
    
        private Map<Integer, Future<User>> cache = new ConcurrentHashMap<>(); // 1
        private UserService service = new UserService();
    
        public static void main(String[] args) {
            Client client = new Client();
            int id = 1234;
            // 启动五个线程并发执行
            for (int i = 0; i < 5; i++) {
                new Thread(() -> System.out.println(client.getUserById(id))).start();
            }
        }
    
        // 使用FutureTask的特性,如果已有查询任务在执行,其他线程可以
        // 获取这个任务,并等待其返回结果
        private User getUserById(Integer id) {
            Future<User> future = cache.get(id);
            if (future == null) {
                FutureTask task = new FutureTask<>(() -> service.getOne(id));
                future = cache.putIfAbsent(id, task); // 2
                if (future == null) {
                    future = task;
                    task.run();
                }
            }
    
            try {
                return future.get();
            } catch (Exception e) {
                cache.remove(id, future); // 3
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    
    }
    

    执行结果:

    D:\javasoft\jdk1.8.0_231\bin\java.exe "...
    User@56f3ee1d
    User@56f3ee1d
    User@56f3ee1d
    User@56f3ee1d
    User@56f3ee1d
    
    Process finished with exit code 0
    

    结果符合我们的需求,接下来一步步分析实现原理,关键实现主要有三个地方。
    首先,相比于上一个版本。ConcurrentHashMap中的valueUser变为Future<User>FutureTaskFuture接口的一个实现类):private Map<Integer, **Future<User>**> cache = new ConcurrentHashMap<>();这样做的目的是将任务缓存起来,告诉其他线程我正在执行任务,你乖乖地等着我执行完成获取结果就好,不需要重复执行相同的任务了。
    其次,在getUserById方法中,当缓存中还没有任务时,会创建一个FutureTask任务,并将它放入缓存中。注意这里创建任务时可能多个线程都完成了创建任务的工作,但是放进缓存的时候只能有一个线程成功。这得益于ConcurrentHashMap的线程安全机制,以及future = cache.putIfAbsent(id, task);putIfAbsent方法的使用。这个方法只有在当前key没有对应value时才进行写入,并且返回null。反之,则不会进行写入操作,而是直接返回key对应的值。这就保证了在key相同的情况下只会有一个线程创建的任务能放进缓存中。
    最后,需要考虑缓存污染的问题。假如任务执行失败了,那么其他线程通过缓存获取的FutureTask就无法获取到正确的结果。所以我们需要在get发生异常时移除对应的缓存数据。
    以上是个人对ConcurrentHashMapFutureTask使用的一些理解,如果哪里写得有问题,还望各位猿友斧正。

    相关文章

      网友评论

          本文标题:FutureTask配合ConcurrentHashMap实现任

          本文链接:https://www.haomeiwen.com/subject/gmttwhtx.html