美文网首页Spring-Boot程序员分布式架构
基于redis的三级分布式缓存实现实例

基于redis的三级分布式缓存实现实例

作者: 简单是美美 | 来源:发表于2020-04-27 10:42 被阅读0次

    1. 分布式系统中的领域信息模型与缓存机制

      在领域驱动设计的方法中,确定领域的信息模型(知识模型)是系统设计的重要工作。在我们识别了领域模型中的实体,值对象和聚合对象之后,需要在面向对象的系统中将其实例化。
      在分布式系统中,不同的服务(子系统)可能关注于不同的实体对象,但也有一些公共的实体对象可能会被多个服务所关注。在分布式系统中,一些实体对象会被某些服务频繁使用,因而缓存机制在系统实现中必不可少。
      在实际应用中,我们通常使用三级缓存机制,即内存(一级)、redis(二级)、数据库(三级)三级数据缓存机制。在读取数据时依次从内存、redis、数据库中读取数据。在写入数据从依次从数据库、redis、内存中写入。
      根据spring的设计惯例,我们常常将一个实体对象的访问类用一个DAO对象来表示,在DAO对象中实现分布式的三级缓存机制。则使用DAO对象对应用系统屏蔽了缓存的实现细节。应用系统则只关注如何使用DAO与领域实体进行交互。
      分布式系统中常常使用一个共享的缓存中间件实现各服务(子系统)之间的缓存的交互,这个缓存中间件我们常常使用redis集群。对于分布式系统而言,在三级缓存体系中,各服务(子系统)拥有自己的内存缓存(一级)和独立的数据库(三级),共享一个二级缓存redis集群。
      在这样一个分布式的缓存体系中,缓存数据的同步是必须要考虑的。考虑这样一个场景:A服务修改了某个实体对象,同时更新到二级缓存,B服务再次使用该实体对象时,应该使用更新后的实体对象。如果使用redis集群来实现二级缓存,应该使用订阅发布机制来实现各服务(子系统)之间的缓存同步。
      同时,在一级缓存中,缓存的过期策略通常也需要设置。设置的目的也是为了确保数据不同步引发的错误是可自愈的,参数常用的是写过期时间和读过期时间两类。
      整个分布式应用系统中的三级缓存架构如图1所示。


    图1.png

    2 spring提供的缓存机制

      spring框架中提供对缓存的支持,支持如图2所示的缓存类型。


    图2.png

      spring框架中对缓存的支持主要有两个接口的实现来支持,一个为CacheManager接口(org.springframework.cache.CacheManager),一个为Cache接口(org.springframework.cache.Cache)。
      spring框架中对缓存的应用主要使用:@Cacheable @CachePut @CacheEvict @Caching等注解实现。
      相关的资料可从网上获取,在此不再赘诉。

    3 分布式缓存实现实例

      在我们的实际应用项目中,我们设计了一种分布式的三级缓存机制,这里使用了redis作为二级缓存,caffeine作为一级缓存,数据库作为三级存储介质。

    3.1 实现类的层次

      在实际应用中,针对redis的各数据类型设计了对不同数据类型的抽象操作子类,对于领域内各实体对象的操作DAO类则继承自这些redis的抽象操作子类。其类层次结构图如图3所示:


    图3.png

    3.2 一级缓存的实现与策略配置

      在实现中,我们使用caffeine作为一级缓存,实现了自定义的CacheManager,可实现动态与指定名称cache的生成,并可针对指定名字的cache配置不同的缓存策略。其部分实现代码如下:

    /**
         * 自定义cacheManager,实现动态生成的cache使用缺省的配置
         * 
         * @author zhang.kai
         *
         */
        public class VlineCacheManager extends SimpleCacheManager {
            @Override
            protected Cache getMissingCache(String cacheName) {
                return createCaffeineCache(cacheName, cacheProperties.getDefaultspec());
            }
        }
    
        /**
         * CacheManager对象注入
         * 
         * @return
         */
        @Bean
        public CacheManager getCacheManager() {
            if ((null == cacheProperties) || (null == cacheProperties.getDefaultspec())
                    || (null == cacheProperties.getCachespecs())) {
                log.error("cacheProperties is invalid:{}", cacheProperties);
                return null;
            }
            VlineCacheManager cm = new VlineCacheManager();
            List<Cache> caches = new ArrayList<>();
            cacheProperties.getCachespecs().keySet().forEach(cacheName -> {
                String cacheSpec = cacheProperties.getCachespecs().get(cacheName);
                if (StringUtils.isEmpty(cacheName)) {
                    log.error("XXX no cacheSpec for cacheName{}", cacheName);
                    return;
                }
                CaffeineCache cache = createCaffeineCache(cacheName, cacheSpec);
                caches.add(cache);
            });
            if (caches.size() > 0) {
                cm.setCaches(caches);
            } else {
                log.error("XXX no cache inited!");
                return null;
            }
    
            // 设置redis的发布ID
            redisPublisherId = getRedisPublishId();
            log.info("%%%%%get redisPublisherId:{}",redisPublisherId);
            return cm;
        }
    

      对应的配置文件中,对cache的读写策略配置如下:

    vline:
      cache:
        defaultspec: initialCapacity=50,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=500s
        cachespecs:
          CONFIG_INFO: initialCapacity=50,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=500s
          USER_LOGIN_DEVICE_INFO: initialCapacity=60,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=7s
    

    3.3 缓存读取方式

      对于三级缓存依次从内存,redis,数据库中获取,其代码片段如下所示:

    /**
         * 从内存,redis,DB中依次获取实体
         * 
         * @param key
         * @return value
         */
        public T get(String key) {
            T value = null;
    
            // 1.如果在内存缓存中,则获取内存缓存
            if (enableMemoryCached) {
                value = getFromCacheManager(key);
                if (null != value) {
                    log.debug("***get from memory***{}->{}", getRedisKey(key), value);
                    return value;
                }
            }
            // 2.从redis中获取,对于键不存在的hash,redis返回empty map
            value = getFromRedis(key);
            if (null != value) {
                // 从redis中获取并回写到内存
                log.debug("***get from redis***{}->{}", getRedisKey(key), value);
                putToCacheManager(key, value);
            } else {
                if (enableDbLoad) {
                    // 3.从数据库中获取并回写到redis和内存,可能存在竞争,重试一定次数
                    RedisLock redisLock = new RedisLock(srt, getRedisKey(key) + "_sync_lock");
                    int retryCount = 0;
                    do {
                        try {
                            // 如果获取到分布式锁,则从数据库中获取,取完后返回
                            if (redisLock.lock()) {
                                value = loadValueFromDb(key);
                                log.debug("***get from db***{}->{}", getRedisKey(key), value);
                                if (null != value) {
                                    set(key, value); // 回写入redis与内存
                                }
                                threadAwait.signalAll(getRedisKey(key)); // 唤醒所有等待该key的线程
                                return value;
                            }
                            // 如果未获取到分布式锁,说明别的进程正在进行查询数据库,等待一段时间后查询redis
                            threadAwait.await(getRedisKey(key), CacheConsts.LOAD_FROM_DB_WAIT_TIME);
                            value = getFromRedis(key);
                            if (null != value) {
                                if (enableMemoryCached) {
                                    putToCacheManager(key, value);
                                }
                                return value;
                            }
                        } catch (Exception e) {
                            log.error("get" + getRedisKey(key) + "fail!", e);
                        } finally {
                            redisLock.unlock();
                        }
                        retryCount++;
                    } while (retryCount < this.retryTimes);
                }
            }
            return value;
        }  
    

      这里考虑当多个进程(线程)读取同一key时,使用分布式锁来实现。使得只有第一个获取锁的进程(线程),会去三级存储数据库中获取并存入redis中。其它读取该key的进程(线程)将在redis中获取。这里代码中使用的分布式锁和线程唤醒对象参考了来自https://github.com/wyh-spring-ecosystem-student/spring-boot-student处的代码。

    3.4 缓存的失效与同步

      在一级缓存caffeine中,我们设置了缓存的失效策略。同时在分布式系统中,不同的进程可能都会在一级缓存中缓存相同的key,因此在对应的redis key发生变化时(修改、删除),应该通知倒各进程,这个通过redis的订阅发布机制实现。
      一个进程可能是某个key发生改变的发起者,则它应该在订阅到该key发生变化时不做任何处理,这里我们通过在发布信息中增加一个发布者的唯一标识来实现,发布者的唯一标识在public CacheManager getCacheManager()函数中实现。在redis中发布的信息形式为:{redisPublishId}${keyPrefix}:{key}。
      每个进程的监听处理函数是这样的:

    @Override
        public void onMessage(Message message, byte[] pattern) {
            if (null != message.getBody()) {
                String key = new String(message.getBody(), Charset.forName("UTF8"));
                log.debug("+++reveive msg:{} which topic is:{}", key,
                        new String(message.getChannel(), Charset.forName("UTF8")));
                String publisherId = findRedisPublisherId(key);
                // 如果是本进程发布的,不处理
                if (StringUtils.isEmpty(publisherId) || publisherId.equals(CacheConfiguration.redisPublisherId)) {
                    return;
                }
                // 不是本进程发布的,在一级缓存中失效.发布的key形式为:{redisPublishId}${keyPrefix}:{key}
                String redisKey = key.substring(key.indexOf(CacheConsts.REDIS_PUBLISHER_ID_SEPARATE) + 1);
                String keyPrefix = findKeyprefixFromRedisKey(redisKey);
                Cache cache = cm.getCache(keyPrefix);
                if (null != cache) {
                    // 从一级缓存中删除
                    log.debug("---evict key:{} from cache:{}", redisKey, cache.getName());
                    cache.evict(redisKey);
                }
            } else {
                log.error("RedisMsgListener onMessage null!");
            }
        }
    

    4 小结

      本文所描述的示例代码可从:https://github.com/solarkai/distributedcache
    处获取。
      对于三级缓存方式,网上有很多不同的实现方式。在我们的实现中,针对redis的不同数据类型衍生出不同的针对领域实体的DAO对象,这些对象屏蔽了缓存使用的细节;同时,在我们的实现中,实现了动态生成与指定名称的缓存生成方式,可针对不同的缓存配置不同的策略;针对一级缓存的同步,我们使用redis的订阅发布机制实现,并在发布内容中增加发布者的标识,避免重复的同步操作。

    相关文章

      网友评论

        本文标题:基于redis的三级分布式缓存实现实例

        本文链接:https://www.haomeiwen.com/subject/ktskwhtx.html