一 结构
结构.png二 ConfigFactory
- 通过ConfigFactory方法,创建NacosConfigService,作为配置管理的入口对象。
- 一种是通过自定义配置kv方式指定属性,另一种指定配置服务端地址方式。
public class ConfigFactory {
public static ConfigService createConfigService(Properties properties) throws NacosException {
...
}
public static ConfigService createConfigService(String serverAddr) throws NacosException {
...
}
}
三 NacosConfigService
3.1 配置管理
3.1.1 配置发布,删除
按接口文档配置请求参数,调用ServerHttpAgent接口
3.1.2 配置获取
- 优先级最高使用failvoer配置
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
- 次之从配置服务端获取配置
clientWorker.getServerConfig(dataId, group, tenant, timeoutMs)
- 最低优先级获取本地snapshot配置
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant)
3.2 配置变更监听管理
调用ClientWork的配置变更监听添加删除接口
四 ClientWork
4.1 获取配置接口
调用ServerHttpAgent接口获取配置,获取成功则保存或删除本地snapshot配置。
HttpResult result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
switch (result.code) {
case HttpURLConnection.HTTP_OK:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return null;
...
}
4.2 配置变更监听存储
- 存储监听变更的配置集
AtomicReference<Map<String/*group key*/, CacheData>> cacheMap
- group key生成函数
static public String getKeyTenant(String dataId, String group, String tenant) {
StringBuilder sb = new StringBuilder();
urlEncode(dataId, sb);
sb.append('+');
urlEncode(group, sb);
if (StringUtils.isNotEmpty(tenant)) {
sb.append('+');
urlEncode(tenant, sb);
}
return sb.toString();
}
4.3 配置变更监听处理
4.3.1 定时任务
- 初始化时,创建10ms定时任务,启动LongPullingRunnable任务
- 每个LongPullingRunnable任务默认处理3000个监听配置集
- 根据总监听数量计算需要启动的LongPullingRunnable任务数
- currentLongingTaskCount保存已启动的LongPullingRunnable任务数
- 根据需要启动新的LongPullingRunnable任务数。
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
log.error(agent.getName(), "NACOS-XXXX", "[sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPullingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
4.3.2 LongPullingRunnable
4.3.2.1 failover配置变更监听
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
log.error("NACOS-CLIENT", "get local config info error", e);
}
}
}
- 没有->有,有->有且存在变更,更新cachData配置信息
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
- 有->没有
cacheData.setUseLocalConfigInfo(false);
从服务端获取配置。
4.3.2.2 服务端配置变更监听
-
checkUpdateDataIds()
遍历本线程处理的cachData,对其中无本地failover配置的项,组装配置集串,格式为dataId2Group2contentMD52tenant1或者dataId2Group2contentMD5^1。设置Listening-Configs请求参数。 -
checkUpdateConfigStr()
执行longpulling
Long-Pulling-Timeout头指定请求超时时间,默认30s
Long-Pulling-Timeout-No-Hangup首次监听时配置,告知服务端无论是否有变更都立即返回 -
parseUpdateDataIdResponse()
解析服务端返回有变更的配置集。格式与请求参数Listening-Configs格式相同
4.3.2.3 配置变更监听回调
- 遍历便跟的配置集,从服务端获取对应配置。更新cachdata
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
- 遍历cachData列表,通过配置数据的md5检查是否发生变更,触发变更回调。
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
- 监听回调
如果listener提供了线程执行器,则异步线程调用变更回调,否则在当前LongPullingRunnable线程调用。
Runnable job = new Runnable() {
public void run() {
...
listener.receiveConfigInfo(contentTmp);
listenerWrap.lastCallMd5 = md5;
...
}
};
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
五 ServerHttpAgent
5.1 ServerListManager配置服务端管理
5.1.1 配置服务器地址
-
List<String> serverUrls
保存服务端地址。 -
ServerAddressIterator
对服务端地址随机排序。 - currentServerAddr存储上次访问成功的服务端地址。尽量保证longpulling配置变更监听的地址和获取配置的是同一个服务器。
- 访问失败则调用
refreshCurrentServerAddr
更新下次使用的服务端地址。
public void refreshCurrentServerAddr() {
currentServerAddr = iterator().next();
}
public String getCurrentServerAddr() {
if (StringUtils.isBlank(currentServerAddr)) {
currentServerAddr = iterator().next();
}
return currentServerAddr;
}
5.1.2 配置地址服务器
使用定时线程,从地址服务器获取最新配置服务器地址列表,更新本地缓存List<String> serverUrls
5.2 Limiter调用限速
- 初始化最多对1000个url请求限速。
- 每次限速区间1分钟
private static int CAPACITY_SIZE = 1000;
private static Cache<String, RateLimiter> cache = CacheBuilder.newBuilder()
.initialCapacity(CAPACITY_SIZE).expireAfterAccess(1, TimeUnit.MINUTES)
.build();
- 每个限速区间限制5次调用
rateLimiter = cache.get(accessKeyID, new Callable<RateLimiter>() {
@Override
public RateLimiter call() throws Exception {
return RateLimiter.create(5);
}
});
- 每次请求获取令牌时间最多等待1s
rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)
网友评论