目前使用的是consul作为存储中心,但是如何“无痛”切换到mongodb、nacos、redis、mysql等存储中心呢?本文带你去了解。
1. 使用“组合”+“接口”将不同存储中心“统一化”
不同的物理存储中心,例如consul、nacos、redis、mongo都是no-sql存储,即存储k-v形式的数据。甚至mysql中也可以存储json格式的数据。虽然有各种各样形式的client实现。但若是想嵌入到咱sentinel作为存储中心,必须满足咱定义的“接口”规范。通过“组合”的形式将存储中心实际client嵌入到接口子类中,对外提供服务。
1.1 “规范”的存储中心操作类
接口如下:
/**
* 组合的方式来设置类
*/
public interface RulesOperation<T> {
void put(String key, String val);
String get(String key);
void removeMap(String key);
/**
* 通过"appRules"获取到项目名。
* 当然,若使用redis等存储中心,返回结果可以读取某个key里面的定值,无需像consul一样实时计算~
*
* @param name 为appRules。
* @return 持久化的配置名
*/
Set<String> getAppNames(String name);
default void putKeyStrMap(String key, ConcurrentMap<String, T> map) {
String val;
try {
val = JacksonUtil.objectMapper().writeValueAsString(map);
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
put(key, val);
}
default ConcurrentMap<String, T> getKeyStrMap(String key) {
ConcurrentMap<String, T> map = new ConcurrentHashMap<>();
String value = get(key);
if (value != null) {
try {
map = JacksonUtil.objectMapper().
readValue(value, new TypeReference<ConcurrentHashMap<String, T>>() {
});
} catch (IOException e) {
throw new RuntimeException("反序列化失败", e);
}
}
return map;
}
default void putMap(String key, ConcurrentMap<Long, T> map) {
String val;
try {
val = JacksonUtil.objectMapper().writeValueAsString(map);
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
put(key, val);
}
default ConcurrentMap<Long, T> getMap(String key) {
ConcurrentMap<Long, T> map = new ConcurrentHashMap<>();
String value = get(key);
if (value != null) {
try {
map = JacksonUtil.objectMapper().
readValue(value, new TypeReference<ConcurrentHashMap<Long, T>>() {
});
} catch (IOException e) {
throw new RuntimeException("反序列化失败", e);
}
}
return map;
}
}
没错,只需要简单的实现put、get、remove等方法,就可以无痛的替换存储中心。
1.2 consul分布式存储中心
/**
* 实际的操作类——将实际的操作类进行解耦
*
*/
public class ConsulRulesOperation<T> implements RulesOperation<T> {
private ConsulClient consulClient;
private final Logger logger = LoggerFactory.getLogger(ForgeSimpleMachineDiscovery.class);
public ConsulRulesOperation(ConsulClient consulClient) {
this.consulClient = consulClient;
}
@Override
public void put(String key, String val) {
consulClient.setKVValue(genKey(key), val);
}
@Override
public String get(String key) {
GetValue res = consulClient.getKVValue(genKey(key)).getValue();
String ans = null;
if (res != null) {
ans = res.getDecodedValue();
}
return ans;
}
public void removeMap(String key) {
consulClient.deleteKVValue(genKey(key));
}
/**
* 通过前缀获取项目名
*/
@Override
public Set<String> getAppNames(String prefix) {
Response<List<String>> appRules = consulClient.getKVKeysOnly(genKey(prefix));
List<String> results = appRules.getValue();
//规则的截取-appRules/项目名/规则名
Set<String> consulSet = new HashSet<>();
if (results != null) {
for (String result : results) {
try {
String[] arr = result.split("/");
consulSet.add(arr[2]);
} catch (Exception e) {
logger.error("", e);
}
}
}
return consulSet;
}
/**
* 生产存储的key
* @param name
* @return
*/
public String genKey(String name) {
return String.join("/", "sentinel", name);
}
}
1.3 动态切换存储中心的配置
1.3.1 config配置类
配置文件中修改:sentinel.dashboard.store
属性完成动态切换。
@EnableConfigurationProperties(RepositoryProperties.class)
@Configuration
public class RepositoryConfig {
private final Logger logger = LoggerFactory.getLogger(RepositoryConfig.class);
/**
* 伪造心跳的bean
*/
@Bean
@ConditionalOnMissingBean
public ForgeSimpleMachineDiscovery forgeSimpleMachineDiscovery(RulesOperation RulesOperation) {
logger.info("启动分布式机器注册[{}]", RulesOperation.getClass().getSimpleName());
return new ForgeSimpleMachineDiscovery(RulesOperation);
}
/**
* 分布式规则存储中心
*/
@Bean
@ConditionalOnMissingBean
public <T> RulesStoreManager<T> appRulesStoreManager(RulesOperation<T> rulesOperation, Lock lock) {
logger.info("服务维度规则的存储中心注册[{}]", rulesOperation.getClass().getSimpleName());
return new RulesStoreManager<>(rulesOperation, lock);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "sentinel.dashboard.lock", havingValue = "memory", matchIfMissing = true)
public InMemoryLock inMemoryLock() {
logger.info("内存锁启动...");
return new InMemoryLock();
}
/**
* 初始化consul的配置信息
*/
@Configuration
@ConditionalOnClass(ConsulClient.class)
@ConditionalOnProperty(name = "sentinel.dashboard.store", havingValue = "consul")
public static class ConsulClientConfig {
private final Logger logger = LoggerFactory.getLogger(ConsulClientConfig.class);
/**
* 真实的consul客户端
*/
@Bean
@ConditionalOnMissingBean
public ConsulClient consulClient(RepositoryProperties repositoryProperties) {
logger.info("consul的客户端启动...");
return new ConsulClient(repositoryProperties.getUrl(), repositoryProperties.getPort());
}
/**
* 分布式无论存储的操作客户端
*/
@Bean
@ConditionalOnMissingBean
public <T> RulesOperation<T> rulesOperation(ConsulClient consulClient) {
logger.info("consul的规则操作启动...");
return new ConsulRulesOperation<>(consulClient);
}
/**
* 加载分布式锁
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "sentinel.dashboard.lock", havingValue = "consul")
public ConsulLock consulLock(ConsulClient consulClient) {
logger.info("consul分布式锁启动...");
return new ConsulLock(consulClient);
}
}
}
1.3.2 properties配置类
@ConfigurationProperties("sentinel.dashboard")
public class RepositoryProperties {
/**
* 存储url
*/
private String url;
/**
* 端口号
*/
private int port;
/**
* 存储中心类型
*/
private StoreEnum store;
/**
* 锁类型
*/
private LockEnum lock;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public StoreEnum getStore() {
return store;
}
public void setStore(StoreEnum store) {
this.store = store;
}
}
1.3.3 配置文件
# 存储中心
sentinel.dashboard.url=127.0.0.1
sentinel.dashboard.port=8500
# 分布式存储中心的存储介质
sentinel.dashboard.store=consul
# 分布式存储中心读取数据锁的类型
sentinel.dashboard.lock=consul
2. 分布式存储如何在使用泛型的情况下操作map?
为什么操作map,因为sentinel单机版就大量使用map作为存储中心,为了少修改代码,存储中心的value应该是map类型。
当value是map类型时,就出现了一个问题:
-
操作不扁平:可以将consul等分布式存储中心看着一个大map,那么单机版中的map实际是consul的value值,consul操作value必须:
- 将JSON在consul取到内存,并反序列为map;
- 修改内存中的map;
- 将内存中的map序列化为JSON,并存到consul;
- 泛型操作:操作的value的值实际上是泛型类型,根据子类得到不同的泛型对象
-
线程不安全:单机版使用
ConcurrentHashMap.computeIfAbsent
在线程安全的情况下获取到value,并通过值传递的形式来修改。
2.1 实现扁平化操作
接口关系如下所示:实现如下接口满足map的扁平化操作。
2.2 如何实现泛型操作
重点观察泛型和接口的关系,比较复杂。
存储实体的基本操作
/**
* 存储实体的管理器
*/
public interface StoreManager<K, V> {
void put(K key, V value);
/**
* 取出数据
*/
V get(K key);
V remove(K key);
}
map扁平化接口:
public interface ExtendMapOperation<K, F, V> {
void putField(K key, F field, V value);
V getField(K key, F field);
V removeField(K key, F field);
}
规则的分布式存储实现类——可以存储allRules规则以及appRules规则:
public class RulesStoreManager<T> implements StoreManager<String, ConcurrentMap<Long, T>>, ExtendMapOperation<String, Long, T> {
private RulesOperation<T> rulesOperation;
private Lock lock;
public RulesStoreManager(RulesOperation<T> rulesOperation, Lock lock) {
this.rulesOperation = rulesOperation;
this.lock = lock;
}
/**
* @param key appName
* @param value Map<Long,T>对象
*/
@Override
public void put(String key, ConcurrentMap<Long, T> value) {
//生成组装的key: 一个服务下的所有配置
rulesOperation.putMap(key, value);
}
@Override
public ConcurrentMap<Long, T> get(String appName) {
return rulesOperation.getMap(appName);
}
@Override
public ConcurrentMap<Long, T> remove(String key) {
ConcurrentMap<Long, T> map = get(key);
rulesOperation.removeMap(key);
return map;
}
@Override
public void putField(String appName, Long field, T value) {
lock.threadSafetyFunction(appName, 10, () -> {
ConcurrentMap<Long, T> map = get(appName);
map.put(field, value);
put(appName, map);
return null;
});
}
@Override
public T getField(String appName, Long field) {
Map<Long, T> map = get(appName);
return map.get(field);
}
@Override
public T removeField(String appName, Long field) {
return lock.threadSafetyFunction(appName, 10, () -> {
ConcurrentMap<Long, T> map = get(appName);
T remove = map.remove(field);
put(appName, map);
return remove;
});
}
}
2.2.1 泛型操作遇到的坑—反序列化无法得到泛型对象
对应FastJson来说,在反序列中传入type是可以得到泛型对象的。
但实际上得到的却是JsonObject对象。无法转换为RuleEntity
对象。
使用Jackson也遇见了此种情况:
map = JacksonUtil.objectMapper().
readValue(value, new TypeReference<ConcurrentHashMap<String, T>>() {
});
2.2.2 解决方案
自定义Jackson的ObjectMapper对象,在序列化的时候将属性的类型也序列化即可:
public class JacksonUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public static ObjectMapper objectMapper() {
return objectMapper;
}
}
2.3 如何实现线程安全
使用的锁机制,实现了consul分布式锁以及默认的内存锁。保证“扁平化”操作时的线程安全性。
/**
* 锁机制
*
*/
public interface Lock {
String lock(String lockName, int ttlSeconds);
void unLock(String sessionID);
/**
* 线程安全的操作
*
* @param lockName 锁名称
* @param ttlSeconds 最大超时时间
* @param supplier 待保护的临界资源
* @param <T> 返回值的类型
* @return 临界资源返回值
*/
default <T> T threadSafetyFunction(String lockName, int ttlSeconds, Supplier<T> supplier) {
T result;
String successId = lock(lockName, 10);
if (successId != null) {
try {
result = supplier.get();
} finally {
unLock(successId);
}
} else {
throw new RuntimeException("冲突啦,请稍等处理~");
}
return result;
}
}
2.3.1 (默认)内存锁的实现
借助ConcurrentHashMap将ReentrantLock存储起来。
——“这么骚的操作是在seata源码中借鉴的。”
/**
* 内存级别的锁,默认提供
*
*/
public class InMemoryLock implements Lock {
/**
* map维护Object的Monitor对象锁
*/
private Map<String, ReentrantLock> lockMap = new ConcurrentHashMap<>(64);
@Override
public String lock(String lockName, int ttlSeconds) {
//获取key对应的内存锁
ReentrantLock reentrantLock = lockMap.computeIfAbsent(lockName, (k) -> new ReentrantLock());
try {
if (reentrantLock.tryLock(ttlSeconds, TimeUnit.SECONDS)) {
return lockName;
}
} catch (InterruptedException e) {
}
return null;
}
@Override
public void unLock(String sessionID) {
if (sessionID == null) {
return;
}
//获取对应的锁
ReentrantLock reentrantLock = lockMap.get(sessionID);
//释放锁
reentrantLock.unlock();
}
}
2.3.2 (重点)consul实现分布式锁
/**
* consul分布式锁机制
*
*/
public class ConsulLock implements Lock {
private ConsulClient consulClient;
/**
* 构造函数
*
* @param consulHost 注册consul的client或服务端的Ip或主机名,或域名
* @param consulPort 端口号
*/
public ConsulLock(String consulHost, int consulPort) {
consulClient = new ConsulClient(consulHost, consulPort);
}
public ConsulLock(ConsulClient consulClient) {
this.consulClient = consulClient;
}
/**
* 获得锁的方法
*
* @param lockName 竞争的资源名
* @param ttlSeconds 锁的超时时间,超过该时间自动释放【超时时间,最低10秒】
* @return 若为null表示加锁失败,若非null,表示加锁成功
*/
public String lock(String lockName, int ttlSeconds) {
if (ttlSeconds < 10 || ttlSeconds > 86400) ttlSeconds = 10;
String sessionId = createSession(lockName, ttlSeconds);
//个性化处理
boolean success = lock("sentinel/lock/" + lockName, sessionId);
if (!success) {
consulClient.sessionDestroy(sessionId, null);
return null;
}
return sessionId;
}
public void unLock(String sessionID) {
consulClient.sessionDestroy(sessionID, null);
}
private String createSession(String lockName, int ttlSeconds) {
NewCheck check = new NewCheck();
check.setId("check " + lockName);
check.setName(check.getId());
check.setTtl(ttlSeconds + "s"); //该值和session ttl共同决定决定锁定时长
check.setTimeout("10s");
consulClient.agentCheckRegister(check);
consulClient.agentCheckPass(check.getId());
NewSession session = new NewSession();
session.setBehavior(Session.Behavior.RELEASE);
session.setName("session " + lockName);
session.setLockDelay(1);
session.setTtl(ttlSeconds + "s"); //和check ttl共同决定锁时长
List<String> checks = new ArrayList<>();
checks.add(check.getId());
session.setChecks(checks);
return consulClient.sessionCreate(session, null).getValue();
}
private boolean lock(String lockName, String sessionId) {
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
return consulClient.setKVValue(lockName, "lock:" + LocalDateTime.now(), putParams).getValue();
}
}
2.3.3 按照配置切换锁
在1.3小节的“动态切换”中,可以根据配置文件来动态切换锁。当然以后去除consul使用redis等存储中心,可以实现定义的Lock接口,实现Redis的分布式锁。
网友评论