1. 分布式系统中的领域信息模型与缓存机制
在领域驱动设计的方法中,确定领域的信息模型(知识模型)是系统设计的重要工作。在我们识别了领域模型中的实体,值对象和聚合对象之后,需要在面向对象的系统中将其实例化。
在分布式系统中,不同的服务(子系统)可能关注于不同的实体对象,但也有一些公共的实体对象可能会被多个服务所关注。在分布式系统中,一些实体对象会被某些服务频繁使用,因而缓存机制在系统实现中必不可少。
在实际应用中,我们通常使用三级缓存机制,即内存(一级)、redis(二级)、数据库(三级)三级数据缓存机制。在读取数据时依次从内存、redis、数据库中读取数据。在写入数据从依次从数据库、redis、内存中写入。
根据spring的设计惯例,我们常常将一个实体对象的访问类用一个DAO对象来表示,在DAO对象中实现分布式的三级缓存机制。则使用DAO对象对应用系统屏蔽了缓存的实现细节。应用系统则只关注如何使用DAO与领域实体进行交互。
分布式系统中常常使用一个共享的缓存中间件实现各服务(子系统)之间的缓存的交互,这个缓存中间件我们常常使用redis集群。对于分布式系统而言,在三级缓存体系中,各服务(子系统)拥有自己的内存缓存(一级)和独立的数据库(三级),共享一个二级缓存redis集群。
在这样一个分布式的缓存体系中,缓存数据的同步是必须要考虑的。考虑这样一个场景:A服务修改了某个实体对象,同时更新到二级缓存,B服务再次使用该实体对象时,应该使用更新后的实体对象。如果使用redis集群来实现二级缓存,应该使用订阅发布机制来实现各服务(子系统)之间的缓存同步。
同时,在一级缓存中,缓存的过期策略通常也需要设置。设置的目的也是为了确保数据不同步引发的错误是可自愈的,参数常用的是写过期时间和读过期时间两类。
整个分布式应用系统中的三级缓存架构如图1所示。
![](https://img.haomeiwen.com/i13196023/dccc69c54cb8be99.png)
2 spring提供的缓存机制
spring框架中提供对缓存的支持,支持如图2所示的缓存类型。
![](https://img.haomeiwen.com/i13196023/b2c9c7dc9518b08a.png)
spring框架中对缓存的支持主要有两个接口的实现来支持,一个为CacheManager接口(org.springframework.cache.CacheManager),一个为Cache接口(org.springframework.cache.Cache)。
spring框架中对缓存的应用主要使用:@Cacheable @CachePut @CacheEvict @Caching等注解实现。
相关的资料可从网上获取,在此不再赘诉。
3 分布式缓存实现实例
在我们的实际应用项目中,我们设计了一种分布式的三级缓存机制,这里使用了redis作为二级缓存,caffeine作为一级缓存,数据库作为三级存储介质。
3.1 实现类的层次
在实际应用中,针对redis的各数据类型设计了对不同数据类型的抽象操作子类,对于领域内各实体对象的操作DAO类则继承自这些redis的抽象操作子类。其类层次结构图如图3所示:
![](https://img.haomeiwen.com/i13196023/cd1d078ca4053a2f.png)
3.2 一级缓存的实现与策略配置
在实现中,我们使用caffeine作为一级缓存,实现了自定义的CacheManager,可实现动态与指定名称cache的生成,并可针对指定名字的cache配置不同的缓存策略。其部分实现代码如下:
/**
* 自定义cacheManager,实现动态生成的cache使用缺省的配置
*
* @author zhang.kai
*
*/
public class VlineCacheManager extends SimpleCacheManager {
@Override
protected Cache getMissingCache(String cacheName) {
return createCaffeineCache(cacheName, cacheProperties.getDefaultspec());
}
}
/**
* CacheManager对象注入
*
* @return
*/
@Bean
public CacheManager getCacheManager() {
if ((null == cacheProperties) || (null == cacheProperties.getDefaultspec())
|| (null == cacheProperties.getCachespecs())) {
log.error("cacheProperties is invalid:{}", cacheProperties);
return null;
}
VlineCacheManager cm = new VlineCacheManager();
List<Cache> caches = new ArrayList<>();
cacheProperties.getCachespecs().keySet().forEach(cacheName -> {
String cacheSpec = cacheProperties.getCachespecs().get(cacheName);
if (StringUtils.isEmpty(cacheName)) {
log.error("XXX no cacheSpec for cacheName{}", cacheName);
return;
}
CaffeineCache cache = createCaffeineCache(cacheName, cacheSpec);
caches.add(cache);
});
if (caches.size() > 0) {
cm.setCaches(caches);
} else {
log.error("XXX no cache inited!");
return null;
}
// 设置redis的发布ID
redisPublisherId = getRedisPublishId();
log.info("%%%%%get redisPublisherId:{}",redisPublisherId);
return cm;
}
对应的配置文件中,对cache的读写策略配置如下:
vline:
cache:
defaultspec: initialCapacity=50,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=500s
cachespecs:
CONFIG_INFO: initialCapacity=50,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=500s
USER_LOGIN_DEVICE_INFO: initialCapacity=60,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=7s
3.3 缓存读取方式
对于三级缓存依次从内存,redis,数据库中获取,其代码片段如下所示:
/**
* 从内存,redis,DB中依次获取实体
*
* @param key
* @return value
*/
public T get(String key) {
T value = null;
// 1.如果在内存缓存中,则获取内存缓存
if (enableMemoryCached) {
value = getFromCacheManager(key);
if (null != value) {
log.debug("***get from memory***{}->{}", getRedisKey(key), value);
return value;
}
}
// 2.从redis中获取,对于键不存在的hash,redis返回empty map
value = getFromRedis(key);
if (null != value) {
// 从redis中获取并回写到内存
log.debug("***get from redis***{}->{}", getRedisKey(key), value);
putToCacheManager(key, value);
} else {
if (enableDbLoad) {
// 3.从数据库中获取并回写到redis和内存,可能存在竞争,重试一定次数
RedisLock redisLock = new RedisLock(srt, getRedisKey(key) + "_sync_lock");
int retryCount = 0;
do {
try {
// 如果获取到分布式锁,则从数据库中获取,取完后返回
if (redisLock.lock()) {
value = loadValueFromDb(key);
log.debug("***get from db***{}->{}", getRedisKey(key), value);
if (null != value) {
set(key, value); // 回写入redis与内存
}
threadAwait.signalAll(getRedisKey(key)); // 唤醒所有等待该key的线程
return value;
}
// 如果未获取到分布式锁,说明别的进程正在进行查询数据库,等待一段时间后查询redis
threadAwait.await(getRedisKey(key), CacheConsts.LOAD_FROM_DB_WAIT_TIME);
value = getFromRedis(key);
if (null != value) {
if (enableMemoryCached) {
putToCacheManager(key, value);
}
return value;
}
} catch (Exception e) {
log.error("get" + getRedisKey(key) + "fail!", e);
} finally {
redisLock.unlock();
}
retryCount++;
} while (retryCount < this.retryTimes);
}
}
return value;
}
这里考虑当多个进程(线程)读取同一key时,使用分布式锁来实现。使得只有第一个获取锁的进程(线程),会去三级存储数据库中获取并存入redis中。其它读取该key的进程(线程)将在redis中获取。这里代码中使用的分布式锁和线程唤醒对象参考了来自https://github.com/wyh-spring-ecosystem-student/spring-boot-student处的代码。
3.4 缓存的失效与同步
在一级缓存caffeine中,我们设置了缓存的失效策略。同时在分布式系统中,不同的进程可能都会在一级缓存中缓存相同的key,因此在对应的redis key发生变化时(修改、删除),应该通知倒各进程,这个通过redis的订阅发布机制实现。
一个进程可能是某个key发生改变的发起者,则它应该在订阅到该key发生变化时不做任何处理,这里我们通过在发布信息中增加一个发布者的唯一标识来实现,发布者的唯一标识在public CacheManager getCacheManager()函数中实现。在redis中发布的信息形式为:{redisPublishId}${keyPrefix}:{key}。
每个进程的监听处理函数是这样的:
@Override
public void onMessage(Message message, byte[] pattern) {
if (null != message.getBody()) {
String key = new String(message.getBody(), Charset.forName("UTF8"));
log.debug("+++reveive msg:{} which topic is:{}", key,
new String(message.getChannel(), Charset.forName("UTF8")));
String publisherId = findRedisPublisherId(key);
// 如果是本进程发布的,不处理
if (StringUtils.isEmpty(publisherId) || publisherId.equals(CacheConfiguration.redisPublisherId)) {
return;
}
// 不是本进程发布的,在一级缓存中失效.发布的key形式为:{redisPublishId}${keyPrefix}:{key}
String redisKey = key.substring(key.indexOf(CacheConsts.REDIS_PUBLISHER_ID_SEPARATE) + 1);
String keyPrefix = findKeyprefixFromRedisKey(redisKey);
Cache cache = cm.getCache(keyPrefix);
if (null != cache) {
// 从一级缓存中删除
log.debug("---evict key:{} from cache:{}", redisKey, cache.getName());
cache.evict(redisKey);
}
} else {
log.error("RedisMsgListener onMessage null!");
}
}
4 小结
本文所描述的示例代码可从:https://github.com/solarkai/distributedcache
处获取。
对于三级缓存方式,网上有很多不同的实现方式。在我们的实现中,针对redis的不同数据类型衍生出不同的针对领域实体的DAO对象,这些对象屏蔽了缓存使用的细节;同时,在我们的实现中,实现了动态生成与指定名称的缓存生成方式,可针对不同的缓存配置不同的策略;针对一级缓存的同步,我们使用redis的订阅发布机制实现,并在发布内容中增加发布者的标识,避免重复的同步操作。
网友评论