美文网首页
JetCache 扩展:实现二级缓存准实时刷新

JetCache 扩展:实现二级缓存准实时刷新

作者: RyanLee_ | 来源:发表于2020-05-01 12:08 被阅读0次

    项目建设过程中遇到一个问题:使用频率很高的基础数据(8MB)(可变更)。缓存到redis 存取效率不高。存在多条这样的基础数据。


    redis insight 截图

    当然,可以用ListenableFutureCallback结合CountDownLatch做并发和结果聚合。(前提是获取缓存数据的操作不存在先后关系)
    但每次接口调用仍会有零点几秒消耗在查询Redis上。

    自行实现二级缓存又存在一致性问题:
    即使有定时刷新作业,也会出现:一段时间内同一个应用的不同实例本地缓存的数据不一致的情况。
    因为更新的时候无法更新所有应用实例的本地缓存。

    举栗说明:
    应用A借助JetCache实现二级缓存。部署时创建两个实例
    实例1修改了一条数据,并更新到数据库、local cache、remote cache中。
    实例2在本地缓存刷新或过期前,local cache中的数据仍是旧的。
    数据不一致的最长时间取决于缓存刷新作业的执行周期

    后面goolge到了阿里开源的二级缓存组件JetCache,做了对比实验:
    Redisson(redis)VS JetCache 一级缓存(redis) VS JetCache 二级缓存(redis + caffeine)

    • 配置文件:


      image.png
    • 单元测试方法:
         @Test
        public void compareEfficiency() throws IOException {
            //保障数据都加载到缓存中
            airportCacheDao.getAllFromRedisson();
            long start = System.currentTimeMillis();
    
            for (int i = 0; i < 20; i++) {
                airportCacheDao.getAllFromRedisson();
            }
            long end = System.currentTimeMillis();
            System.out.println(String.format("通过Redisson查询redis;20次全量数据,耗时:%s ms", end - start));
            //保障数据都加载到缓存中
            airportCacheDao.getAllFromDbOrJetCacheRemote();
            long start1 = System.currentTimeMillis();
    
            for (int i = 0; i < 20; i++) {
                airportCacheDao.getAllFromDbOrJetCacheRemote();
            }
            long end1 = System.currentTimeMillis();
    
            System.out.println(String.format("通过JetCache查询redis;20次全量数据,耗时:%s ms", end1 - start1));
            //保障数据都加载到缓存中
            airportCacheDao.getAllFromDbOrJetCacheBoth();
            long start2 = System.currentTimeMillis();
    
            for (int i = 0; i < 20; i++) {
                airportCacheDao.getAllFromDbOrJetCacheBoth();
            }
            long end2 = System.currentTimeMillis();
    
            System.out.println(String.format("通过JetCache二级缓存查询;20次全量数据,耗时:%s ms", end2 - start2));
            System.in.read();
        }
    
    • DAO中的方法
        public List<MdBmdmAirportDO> getAllFromRedisson() {
            List<MdBmdmAirportDO> airports;
            RBucket<List<MdBmdmAirportDO>> airportRList = redissonClient.getBucket(RedisKey.MDMAirportDataList+"_Redisson");
            if (airportRList.isExists()) {
                //System.out.println("get from cache");
                airports = airportRList.get();
            } else {
                System.out.println("get from db");
                airports = iMdBmdmAirportDao.list(null);
                airportRList.set(airports);
            }
            return airports;
        }
    
        @Cached(name = RedisKey.MDMAirportDataList+"_JetCache_Remote", cacheType = CacheType.REMOTE, expire = 1000)
        public List<MdBmdmAirportDO> getAllFromDbOrJetCacheRemote() {
            System.out.println("get from db");
            List<MdBmdmAirportDO> airports;
            airports = iMdBmdmAirportDao.list(null);
            return airports;
        }
    
        @Cached(name = RedisKey.MDMAirportDataList+"_JetCache_Both", cacheType = CacheType.BOTH, expire = 1000)
        public List<MdBmdmAirportDO> getAllFromDbOrJetCacheBoth() {
            System.out.println("get from db");
            List<MdBmdmAirportDO> airports;
            airports = iMdBmdmAirportDao.list(null);
            return airports;
        }
    
    image.png

    从对比实验中可看出查询效率差别巨大。

    说明:

    1. JetCache 本地缓存组件选了Caffeine。
    2. JetCache @Cached注解:
      cacheType = CacheType.BOTH 二级缓存
      cacheType = CacheType.REMOTE 一级缓存
    3. redisson pro支持二级缓存,但要付费。不予考虑

    JetCache简介及特性

    JetCache是一个基于Java的缓存系统封装,提供统一的API和注解来简化缓存的使用。 JetCache可以原生的支持TTL、两级缓存、分布式自动刷新,还提供了Cache接口用于手工缓存操作。 当前有四个实现,RedisCache、TairCache、CaffeineCache(in memory)和一个简易的LinkedHashMapCache(in memory)。

    • 全部特性:
      通过统一的API访问Cache系统
      通过注解实现声明式的方法缓存,支持TTL和两级缓存
      通过注解创建并配置Cache实例
      针对所有Cache实例和方法缓存的自动统计
      Key的生成策略和Value的序列化策略是可以配置的
      分布式缓存自动刷新,分布式锁 (2.2+)
      异步Cache API (2.2+,使用Redis的lettuce客户端时)
      Spring Boot支持

    请参考:JetCache官方wiki

    简单用法

    JetCache提供了简单易用的注解。相较于springframework.cache提供的注解,JetCache增加了一些新的功能性注解:
    @CacheRefresh:用于定时刷新缓存。
    @CacheUpdate:用于更新缓存
    @CachePenetrationProtect:当缓存访问未命中的情况下,对并发进行的加载行为进行保护。 当前版本实现的是单JVM内的保护,即同一个JVM中同一个key只有一个线程去加载,其它线程等待结果。

    这里不一一列举。请参考:JetCache注解使用说明

    通过JetCache注解实现的方法缓存,缓存value的数据类型统一为String。这块想想也没什么问题。比从复杂数据结构存取数据的效率还高。
    (RedissonSpringCacheManager实现的spring cache方法级别缓存,是通过Hash的结构来缓存方法的返回值的)。

    JetCache本地缓存有最大元素限制,默认是100个。可配置。基于LRU淘汰。

    JetCache存在问题:

    二级缓存结构下:local cache的一致性问题。这块文章开头已举例说明,这里不做赘述。JetCache原生不支持二级缓存自动刷新。

    扩展实现

    把JetCache源码clone下来翻了翻。
    JetCache架构很巧妙。设计了缓存更新消息的发布机制。保障了架构的可扩展性。


    image.png

    如上图:如果容器中存在CacheMessagePublisher的bean。CacheMonitor会在发送缓存更新时间后,调用CacheMessagePublisher.publish方法将缓存更新消息发布出去。遗憾的是JetCache并未提供默认的实现。


    CacheMessagePublisher接口
    CacheMessage结构
    CacheMessagePublisher类型的接口需要自行实现。
    首先想到的是消息订阅发布机制(广播消费)。

    通过RocketMq 实验了一下:"缓存更新消息发布+消费"的方式可行,缓存更新消息可以通知到各个应用实例。
    后面考虑不想有过多的依赖。而且redis 原生支持简单的消息订阅发布机制。于是基于redis 订阅发布机制实现了 CacheMessagePublisher 和 RedisPubSubListener(类似消费者)。上码:

    自定义CacheMessagePublisher的实现:

    package com.xmair.core.jetcache;
    
    import com.alibaba.fastjson.JSON;
    import com.alicp.jetcache.autoconfigure.LettuceFactory;
    import com.alicp.jetcache.support.CacheMessage;
    import com.alicp.jetcache.support.CacheMessagePublisher;
    import io.lettuce.core.cluster.RedisClusterClient;
    import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
    import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Primary;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PreDestroy;
    
    /**
     * 缓存操作消息发布器
     *
     * @author ryanlee
     * @updateTime 2020-04-27 17:26
     */
    @Service
    @Primary
    public class RedisMessagePublisher implements CacheMessagePublisher {
    
        public Logger logger = LoggerFactory.getLogger(RedisMessagePublisher.class);
        @Autowired
        @Qualifier("defaultClient")
        public LettuceFactory lettuceFactory;
    
        private StatefulRedisPubSubConnection<String, String> connection;
    
    
        @PreDestroy
        public void destroy(){
            connection.close();
        }
    
    
        @Value("${jetcache.cacheMessagePublisher.topic}")
        String topicName;
    
        @Override
        public void publish(String area, String cacheName, CacheMessage cacheMessage) {
            try {
                if(null == connection || !connection.isOpen()) {
                    System.out.println("初始化connection");
                    RedisClusterClient client = (RedisClusterClient) lettuceFactory.getObject();
                    connection = client.connectPubSub();
                }
            } catch (Exception e) {
                connection.close();
                e.printStackTrace();
            }
            CacheMessageWithName cacheMessageWithName = new CacheMessageWithName();
            cacheMessageWithName.setArea(area);
            cacheMessageWithName.setCacheName(cacheName);
            cacheMessageWithName.setCacheMessage(cacheMessage);
    
            RedisPubSubAsyncCommands<String, String> async = connection.async();
    
            async.publish(topicName,JSON.toJSONString(cacheMessageWithName));
            logger.info(String.format("发送缓存更新消息:message:%s",cacheMessageWithName));
    
        }
    
    }
    

    自定义RedisPubSubListener的实现:

    package com.xmair.core.jetcache;
    
    import com.alibaba.fastjson.JSON;
    import com.alicp.jetcache.anno.support.CacheContext;
    import com.alicp.jetcache.support.CacheMessage;
    import io.lettuce.core.pubsub.RedisPubSubListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author ryanlee
     * @updateTime 2020-04-27 20:09
     */
    public class CusRedisPubSubListener implements RedisPubSubListener<String, String> {
        public Logger logger = LoggerFactory.getLogger(CusRedisPubSubListener.class);
        private CacheContext cacheContext;
        private ILocalCacheInvalidateStrategy localCacheInvalidateStrategy;
        public CusRedisPubSubListener(CacheContext cacheContext,ILocalCacheInvalidateStrategy localCacheInvalidateStrategy){
            this.cacheContext = cacheContext;
            this.localCacheInvalidateStrategy =localCacheInvalidateStrategy;
        }
        @Override
        public void message(String channel, String message) {
            consumeMessage(message);
        }
    
        @Override
        public void message(String pattern, String channel, String message) {
            consumeMessage(message);
        }
    
        private void consumeMessage(String message) {
            logger.info("CusRedisPubSubListener收到local cache更新消息:"+message);
            CacheMessageWithName cacheMessageWithName = JSON.parseObject(message,CacheMessageWithName.class);
            String area = cacheMessageWithName.getArea();
            String cacheName = cacheMessageWithName.getCacheName();
            CacheMessage cacheMessage = cacheMessageWithName.getCacheMessage();
            localCacheInvalidateStrategy.invalidateLocalCache(cacheContext,area,cacheName,cacheMessage);
        }
    
        @Override
        public void subscribed(String channel, long count) {
            System.out.println(String.format("订阅topic:%s",channel));
    
        }
    
        @Override
        public void psubscribed(String pattern, long count) {
    
        }
    
        @Override
        public void unsubscribed(String channel, long count) {
    
        }
    
        @Override
        public void punsubscribed(String pattern, long count) {
    
        }
    }
    

    注册监听服务:

    package com.xmair.core.jetcache;
    
    import com.alibaba.fastjson.JSON;
    import com.alicp.jetcache.anno.support.CacheContext;
    import com.alicp.jetcache.anno.support.ConfigProvider;
    import com.alicp.jetcache.autoconfigure.AutoConfigureBeans;
    import com.alicp.jetcache.autoconfigure.LettuceFactory;
    import com.alicp.jetcache.autoconfigure.RedisLettuceAutoConfiguration;
    import io.lettuce.core.RedisClient;
    import io.lettuce.core.cluster.RedisClusterClient;
    import io.lettuce.core.pubsub.RedisPubSubListener;
    import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
    import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.util.List;
    
    /**
     * RedisPubSubListener 监听 channel
     *
     * @author ryanlee
     * @updateTime 2020-04-27 17:43
     */
    @Service
    public class LocalCacheUpdateInitService {
    
        @Value("${jetcache.cacheMessagePublisher.topic}")
        String topicName;
    
        @Autowired
        @Qualifier("defaultClient")
        LettuceFactory lettuceFactory;
    
        @Autowired
        ConfigProvider configProvider;
    
        @PostConstruct
        public void initConsumer() throws Exception {
            RedisClusterClient client = (RedisClusterClient) lettuceFactory.getObject();
            StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
            CaffeineInvalidateStrategy caffeineInvalidateStrategy = new CaffeineInvalidateStrategy();
            RedisPubSubListener<String, String> listener = new CusRedisPubSubListener(configProvider.getCacheContext(),caffeineInvalidateStrategy);
            connection.addListener(listener);
            RedisPubSubCommands<String, String> sync = connection.sync();
            sync.subscribe(topicName);
        }
    
    
    }
    

    自定义的Caffeine本地缓存过期策略:

    package com.xmair.core.jetcache;
    
    import com.alibaba.fastjson.JSON;
    import com.alicp.jetcache.anno.support.CacheContext;
    import com.alicp.jetcache.support.CacheMessage;
    import com.alicp.jetcache.support.FastjsonKeyConvertor;
    import com.github.benmanes.caffeine.cache.Cache;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * Caffeine本地缓存过期策略
     *
     * @author ryanlee
     * @updateTime 2020-04-29 08:10
     */
    public class CaffeineInvalidateStrategy implements ILocalCacheInvalidateStrategy {
        public Logger logger = LoggerFactory.getLogger(CaffeineInvalidateStrategy.class);
    
    
        @Override
        public void invalidateLocalCache(CacheContext cacheContext, String area, String cacheName, CacheMessage cacheMessage) {
            System.out.println(String.format("执行失效本地缓存策略: area:%s;cacheName:%s;cacheMessage:%s", area, cacheName, JSON.toJSONString(cacheMessage)));
    
            Cache localCache = cacheContext.getCache(area, cacheName).unwrap(Cache.class);
            //在TYPE_PUT时本想用本地缓存和远程缓存的值做一次比较。如果相同,则不用失效本地缓存。但是JetCache源码中未提供获取远程缓存的方法。
            int type = cacheMessage.getType();
    
            switch (type) {
                case CacheMessage.TYPE_PUT:
                case CacheMessage.TYPE_REMOVE: {
                    Object[] keys = cacheMessage.getKeys();
                    invalidateLocalCaches(localCache, keys);
                    break;
                }
                case CacheMessage.TYPE_REMOVE_ALL: {
                    localCache.invalidateAll();
                    break;
                }
                //TODO:putAll 目前没处理
            }
            logger.info(String.format("失效本地缓存: area:%s;cacheName:%s;cacheMessage:%s", area, cacheName, JSON.toJSONString(cacheMessage)));
    
        }
    
        private void invalidateLocalCaches(Cache localCache, Object[] keys) {
            for (Object key : keys) {
                Object fastJsonKey = FastjsonKeyConvertor.INSTANCE.apply(key);
                Object valueBefore = localCache.getIfPresent(fastJsonKey);
                if (null != valueBefore) {
                    invalidateKey(localCache, fastJsonKey);
                }
            }
        }
    
        private void invalidateKey(Cache localCache, Object fastJsonKey) {
            Object valueBefore = localCache.getIfPresent(fastJsonKey);
            System.out.println("local cache value before invalidate:" + JSON.toJSONString(valueBefore));
            localCache.invalidate(fastJsonKey);
            System.out.println("invalidate local cache key:" + fastJsonKey);
            Object valueAfter = localCache.getIfPresent(fastJsonKey);
            System.out.println("local cache value after invalidate:" + JSON.toJSONString(valueAfter));
        }
    
    }
    

    检验效果:

    1. 首先执行whileGet,模拟一个应用实例循环获取缓存值:
        @Test
        public void whileGet() {
            while (true) {
                System.out.println(airportCacheDao.testCachedOneDay("UK"));
                sleep(1000);
            }
        }
    
        @Cached(name = "STA_JET", cacheType = CacheType.BOTH, expire = 1,timeUnit = TimeUnit.DAYS)
        public String testCachedOneDay(String value) {
            String result = String.format("%s;timestamp:%s", value, Calendar.getInstance().getTime());
            System.out.println("get from testCached method:"+result);
            return result;
        }
    
    1. 然后执行testInvalidate模拟:一个应用实例作废缓存值
        @Test
        public void testInvalidate() throws IOException {
            airportCacheDao.testCacheInvalidate("UK");
        }
    
        @CacheInvalidate(name = "STA_JET")
        public String testCacheInvalidate(String value) {
            String result = String.format("%s;timestamp:%s", value, Calendar.getInstance().getTime());
            System.out.println("get from testCacheInvalidate method:"+result);
    
            return result;
        }
    

    执行结果:


    testInvalidate输出

    由于是广播消息,执行testInvalidate的进程也收到了缓存更新的消息。分析见截图。

    whileGet输出及分析
    1. 然后执行testUpdate模拟:一个应用实例发起缓存更新操作。
        @Test
        public void testUpdate() throws IOException {
            airportCacheDao.testCacheUpdate("UK");
            //为了把缓存更新的消息发送的日志打出来
            sleep(1000);
        }
    
        @CacheUpdate(name = "STA_JET" ,value = "#value")
        public String testCacheUpdate(String value) {
            System.out.println("run testCacheUpdate method:"+value);
            return String.format("%s;timestamp:%s",value,Calendar.getInstance().getTime());
        }
    
    testUpdate输出

    缓存更新消息type字段的值为1(put:新增或修改)。由于是广播消息,执行testUpdate的进程也收到了缓存更新的消息。


    whileGet输出及分析

    为了检验机制的完整性、有效性,还做了一些其它测试。不一一贴出。

    上述实现存在的问题:

    应用实例会消费自己广播的消息,导致部分逻辑重复执行。

    推演一个场景,A、B、C、D四个实例:

    1. A 将JetCache (一级、二级缓存)的一个缓存数据失效,并广播。
    2. A、B、C、D实例收到缓存更新消息(type:remove)后均将local cache中的值作废。
      其中实例A执行了2次本地缓存失效操作。其中第二次是不必要的。
    3. 实例C率先收到查询请求,调用目标方法得到返回值并放入一级、二级中。C广播缓存更新消息(type:新增或更新)。
    4. A、B、C、D实例收到C发出的缓存更新消息后再次将各自的二级缓存(local cache)的值作废掉。
      其中实例C 收到自己广播的缓存更新消息,将二级缓存(local cache)中的值又作废了。这是不必要的。
    5. D接收到查询请求,发现二级缓存(local cache)中没有值,而一级缓存中有值。将一级缓存中的值加载到二级缓存(local cache)。
      此时不触发缓存更新事件。
    6. A接收到查询请求,发现二级缓存(local cache)中没有值,而一级缓存中有值。将一级缓存中的值加载到二级缓存(local cache)。
      此时不触发缓存更新事件。
      .....
      通过上面推演的场景可以发现章节开头提到的问题:应用实例会消费自己广播的消息,导致部分逻辑重复执行。
      初步想法:通过应用实例唯一标识来过滤自己发送的广播消息。TODO

    关于JetCache缓存更新消息:

    应用实例更新、删除缓存值都会触发缓存更新事件。
    应用实例收到查询请求,若一级、二级缓存中均没有值。则执行目标方法体。并将返回值保存到一二级缓存。此时触发缓存更新操作。
    应用实例收到查询请求,若一级缓存(redis)中有值,二级缓存(local cache )没有值。则A将一级缓存中的值加载到二级缓存。此时不触发缓存更新操作。

    上述扩展实现有待完善。有待更多实际场景的检验。抛砖引玉。有兴趣可以多多沟通交流,一起来完善。

    相关文章

      网友评论

          本文标题:JetCache 扩展:实现二级缓存准实时刷新

          本文链接:https://www.haomeiwen.com/subject/ibfzwhtx.html