Java趣谈——如何构建一个高效且可伸缩的缓存

作者: 柳树之 | 来源:发表于2018-02-27 07:45 被阅读656次

    本集概要:

    • 怎样构建一个线程安全而又高效、可伸缩的缓存?
    • 怎样利用设计模式,把缓存做成通用的工具?
    • 除了synchronize和volatile,我们还能使用哪些工具来开发线程安全的代码?

    前情回顾: Volatile趣谈——我是怎么把贝克汉姆的进球弄丢的


    大雄开发的门线传感器在曼联和阿森纳的比赛中一鸣惊人,越来越多的客户向公司订购这款软件......

    糙版缓存

    一天,哆啦对大雄说,“大雄,你看我们后台这个统计用户消费信息的报表,每次统计都要去查数据库,既消耗资源,查询起来也慢,你看看能不能给做一个缓存?”
    “缓存?这个简单呀,缓存的原理其实就是一个键值对,也就是Map,只要把用户的id作为key,把对应的统计结果作为value,放到Map里头缓存起来,下次再来查询时,先到Map去搜,搜到了就直接返回,搜不到,再去查数据库计算,这样就ok了”
    “可以呀,思路很清晰嘛,一下子就讲出了一份设计文档”
    “那是,那我开始写代码啦!”,大雄泡了杯茶,关掉QQ和Outlook,开始写起了代码。

    很快,第一版代码出炉了,UserCostStatComputer(本文的示例代码,可到Github下载):

    public class UserCostStatComputer {
        private final Map<String, BigInteger> cache = new HashMap();
    
        public synchronized BigInteger compute(String userId) throws InterruptedException {
            BigInteger result = cache.get(userId);
            if (result == null) {
                result = doCompute(userId);
                cache.put(userId, result);
            }
            return result;
        }
    
        private BigInteger doCompute(String userId) throws InterruptedException {
            // assume it cost a long time...
            TimeUnit.SECONDS.sleep(3);
            return new BigInteger(userId);
        }
    }
    

    “为了线程安全,我给整个compute方法加上了synchronized修饰符,保证每次只有一个线程可以进入compute方法”,大雄向哆啦介绍他的代码。
    哆啦看了看,说,“大雄,你这代码写的快是快,但是有点糙啊”
    “这。。。”
    “咱先不说性能,就说你这设计,你有没有考虑过,要是还有其他地方也需要缓存,你难道也把这份缓存的逻辑拷贝过去?
    “啊,对呀,总不能每次要用到缓存,就在原先代码里加个Map,然后再写上if判断吧......”
    "小伙子觉悟不错,好好想想,怎样给原先没有缓存的统计函数,加上缓存的功能?"
    “给原先没有某某功能的函数,加上某某功能,这话听着很熟悉啊...”

    通用缓存

    大雄在大脑中检索着,“对了!装饰模式!哎,之前学过的,怎么到实际运用中就忘了呢!”
    利用设计模式,大雄很快把代码进行了重构。
    首先,定义一个Computable接口:

    public interface Computable<A, V> {
        V compute(A arg) throws Exception;
    }
    

    接着,每个要使用缓存功能的计算器,都要去实现这个接口,比如UserCostStatComputer:

    public class UserCostStatComputer implements Computable<String, BigInteger> {
        @Override
        public BigInteger compute(String userId) throws Exception {
            // assume there is a long time compute...
            TimeUnit.SECONDS.sleep(3);
            return new BigInteger(userId);
        }
    }
    

    现在这个UserCostStatComputer是没有缓存功能的,没关系,来一个装饰器就OK了,Memoizer1:

    public class Memoizer1<A, V> implements Computable<A, V> {
        private final Map<A, V> cache = new HashMap<A, V>();
        private final Computable<A, V> c;
    
        public Memoizer1(Computable<A, V> c) {
            this.c = c;
        }
    
        public synchronized V compute(A arg) throws Exception {
            V result = cache.get(arg);
            if (result == null) {
                result = c.compute(arg);
                cache.put(arg, result);
            }
            return result;
        }
    }
    

    Memoizer1即实现了Computable接口,又持有一个Computable对象,同时在自己的compute方法中,调用了Computable对的compute方法,在这个compute方法的外圈,可以看到就是之前的缓存逻辑。
    那么要怎样使用Memoizer1给UserCostStatComputer装饰上缓存功能呢?很简单:

    Computable computer = new Memoizer1(new UserCostStatComputer());
    computer.compute("1");
    

    利用Memoizer1这个装饰器,我们成功的给原先不具有缓存功能的UserCostStatComputer加上了缓存。
    “后面要是有其他的计算器,比如用户地区分布统计计算器、用户年龄分布统计计算器,我们都可以像UserCostStatComputer一样,实现Computable,就可以使用装饰器,装饰上缓存功能了“,大雄得意洋洋地说。
    “不错,小伙子悟性很强啊”,哆啦拍拍大雄的肩膀,“好了,现在通过设计模式解决了代码通用性问题,是时候看看你这个性能问题了”

    雇个保姆

    “性能问题?我这个代码性能有问题么”,大雄翻着白眼。
    “你看看,你把synchronize加在了什么地方?你加在了函数签名的位置,意味着一次只能有一个线程能够进入compute方法,这就导致了一个用户的id在计算的时候,其他用户无法进行计算,只能等到正在计算的用户计算完了,才能进入compute方法”
    “啊,这对性能影响挺大的。。。”
    “没错,换句话说,你这个加锁的粒度太大了,我们先把synchronize去掉,看看有什么线程安全问题,再来逐一解决”
    去掉synchronize的compute方法:

        public V compute(A arg) throws Exception {
            V result = cache.get(arg);
            if (result == null) {
                result = c.compute(arg);
                cache.put(arg, result);
            }
            return result;
        }
    

    “很明显,由于你用的是线程不安全的HashMap,cache.put(arg, result)这一行代码就有线程安全问题”,哆啦说。
    “那给这行代码加锁?像这样”,大雄说着在纸上写出草稿:

    synchronized(this) {
      cache.put(arg, result);
    }
    

    “哈哈,你怎么那么喜欢用synchronized,这样可以是可以,但是粒度还是太大了,Java早就给你提供了一个叫ConcurrentHashMap的保姆,我们只要把线程安全委托给它去处理就好了”
    “啊,我怎么又忘了...”
    说完,大雄修改了代码:

      private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
      // private final Map<A, V> cache = new HashMap<A, V>();
    

    未来任务

    "现在再来看其他地方,由于把synchronize去掉了,现在不同的用户id可以同时计算,但是也导致了相同用户id也会同时被计算,也就是说同样的数据会被计算多次,这样缓存就没意义了"
    “啊,假设用户id为1的数据正在计算,这个时候又来了一个线程,也是计算用户id为1的,如果这个线程可以知道,当前有没有线程正在计算id为1的数据,如果有,就等待这个线程返回计算结果,如果没有,就自己计算,那问题不就解决了?可是这个逻辑挺复杂的,好难实现啊”
    “哈哈,很简单,Java早就提供了FutureFutureTask,来实现你说的这种功能,来,把键盘给我,好久没写代码了”,哆啦说完,马上敲起了键盘。
    终极版Memoizer:

    public class Memoizer <A, V> implements Computable<A, V> {
        private final ConcurrentMap<A, Future<V>> cache
                = new ConcurrentHashMap<A, Future<V>>();
        private final Computable<A, V> c;
    
        public Memoizer(Computable<A, V> c) {
            this.c = c;
        }
    
        public V compute(final A arg) throws Exception {
            // use loop for retry when CancellationException
            while (true) {
                Future<V> f = cache.get(arg);
                if (f == null) {
                    Callable<V> callable = new Callable<V>() {
                        public V call() throws Exception {
                            return c.compute(arg);
                        }
                    };
                    FutureTask<V> ft = new FutureTask<V>(callable);
    
                    // use putIfAbsent to avoid multi thread compute the same value
                    f = cache.putIfAbsent(arg, ft);
    
                    if (f == null) {
                        f = ft;
                        ft.run();
                    }
                }
    
                try {
                    return f.get();
                } catch (CancellationException e) {
                    // remove cache and go into next loop to retry
                    cache.remove(arg, f);
                } catch (ExecutionException e) {
                    // throw it and then end
                    e.printStackTrace();
                    throw e;
                }
            }
        }
    }
    

    简单解释一下代码逻辑,线程进了compute方法后:

    1. 首先去存放future的Map里,搜索有没有已经其他线程已经创建好的future
    2. 如果有(不等于null),那就调用future的get方法
    3. 如果已经计算完成,get方法会立刻返回计算结果
    4. 否则,get方法会阻塞,直到结果计算出来再将其返回。
    5. 如果没有已经创建的future,那么就自己创建future,进行计算

    有几个小点要单独解释的:
    1、为什么要使用putIfAbsent
    这里之所以用putIfAbsent而不用put,原因很简单。
    如果有两个都是计算userID=1的线程,同时调用put方法,那么返回的结果都不会为null,后面还是会创建两个任务去计算相同的值。
    而putIfAbsent,当map里面已经有对应的值了,则会返回已有的值,否则,会返回null,这样就可以解决相同的值计算两次的问题。
    2、为什么要while (true)
    这是因为future的get方法会由于线程被中断而抛出CancellationException。
    我们对于CancellationException的处理是cache.remove(arg, f);,也就是把缓存清理掉,然后进入下一个循环,重新计算,直到计算成功,或者抛出ExecutionException。

    总结

    这篇文章中,我们先是用装饰模式,改良了代码的设计,接着通过使用并发容器ConcurrentHashMap以及同步工具Future,取代了原先的synchronize,实现了线程安全性的委托。

    《Java并发编程实践》中,将委托称为实现线程安全的一个最有效策略。委托其实就是把原来通过使用synchronize和volatile来实现的线程安全,委托给Java提供的一些基础构建模块(当然你也可以写自己的基础构建模块)去实现,这些基础构建模块包括:

    • 同步容器:比如Vector,同步容器的原理大多就是synchronize,所以用的不多;
    • 并发容器:比如本文用到的ConcurrentHashMap,当然还有CopyonWriteArrayList、LinkedBlockingQueue等,根据你的需求使用不同数据结构的并发容器;
    • 同步工具类:比如本文用到的Future和FutureTask,还有闭锁(Latches)、信号量(Semaphores)、栅栏(Barriers)等

    这些基础构建模块,在加上之前所讲的synchronize和volatile,就形成了Java并发的基础知识

    其实这也就是《Java并发编程实践》第一部分的迷你版,总结成一句话就是:
    要想写出线程安全的代码,我们需要用到使用synchronize、volatile,或者将线程安全委托给基础构建模块。

    当然,在实际应用中,只有这些基础知识是不够的,最简单的例子,你不可能每次请求过来都创建线程,这会吃光内存的。所以,你需要一个线程池,来限制线程的数量,这也就引出了《Java并发编程实践》的第二部分,结构化并发应用程序(Structuring Concurrent Applications),听名字确实不知道想讲什么,所以我也将把它啃下来,然后像第一部分一样,用尽可能通俗易懂的语言和大家分享学习心得。

    参考

    相关文章

      网友评论

      • 西门海明:楼主,有写过有关Future和Future Task类功能描述的文章吗?,有的话,发个链接看下,发现你的文章都很浅显易懂。
      • 梦惜残阳:单看好抽象,一会儿上机实践下。之前了解过23种设计模式,但是没有机会使用很快就忘了,至于多线程方面接触的更是微乎其微……
      • acf10d8114e7:学习了。

      本文标题:Java趣谈——如何构建一个高效且可伸缩的缓存

      本文链接:https://www.haomeiwen.com/subject/dztuxftx.html