1: 客户端获取配置
Nacos 服务端保存了配置信息,客户端连接到服务端之后,根据 dataID,group可以获取到具体的配置信息,当服务端的配置发生变更时,客户端会收到通知。
参考文章:
链接:https://www.jianshu.com/p/38b5452c9fec
https://blog.csdn.net/wangwei19871103/article/details/105762802?spm=1001.2014.3001.5502
客户端注册监听 服务端 的本质 就是带着配置和配置值的 MD5 值和后台对比。如果 MD5 值不一致,就立即返回不一致的配置。如果值一致,就等待住 30 秒。返回值为空。
1.1 看看Nacos是如何获取服务端的最新数据
主要是通过NacosConfigService 来实现 及时获取服务端的最新数据。构造函数里面构建了HttpAgent和
ClientWorker。
public ClientWorker(final HttpAgent agent, ConfigFilterChainManager configFilterChainManager, Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
this.init(properties);
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
ClientWorker.this.checkConfigInfo();
} catch (Throwable var2) {
ClientWorker.LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", var2);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
ClientWorker 除了将 HttpAgent 维持在自己内部,还创建了两个线程池:
- 第一个线程池是只拥有一个线程用来执行定时任务的 executor,executor 每隔 10ms 就会执行一次 checkConfigInfo() 方法,从方法名上可以知道是每 10 ms 检查一次配置信息。
- 第二个线程池是一个普通的线程池,从 ThreadFactory 的名称可以看到这个线程池是做长轮询的。
长轮训任务 LongPollingRunnable的run
首先会遍历所有的CacheData,找出是当前任务的加入到一个集合里,如果是本次任务的就先检查本地配置,如果有改变的话就要通知监听器。如果是有在初始化的CacheData,那服务器就会立即返回,否则会被挂起,这个原因就是为了不进行频繁的空轮询,又能实现动态配置,只要在挂起的时间段内有改变,就可以理解响应给客户端。获取完之后再检查有没改变,有的话也要通知,然后继续调度当前任务。
接下来去请求服务器获取配置信息。
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
//是否是在初始化的CacheData,会影响服务器是否挂起或者立即返回
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {//属于当前长轮询任务的
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {//用本地配置
cacheData.checkListenerMd5();//有改变的话会通知
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
//获取有变化的配置列表dataid+group,访问的url是/listener
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {//有更新的就获取一次配置
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);//设置配置内容
if (null != ct[1]) {
cache.setType(ct[1]);//设置配置类型
}
...
} catch (NacosException ioe) {
...
}
}
for (CacheData cacheData : cacheDatas) {//不是初始化中的或者初始化集合里存在的
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();//检查是否有变化,有变化就通知
cacheData.setInitializing(false);//请求过了后就设置为不在初始化中,这样就会被挂起,如果服务器配置有更新,就会立即返回,这样就可以实现动态配置更新,又不会太多的空轮询消耗
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
- checkUpdateDataIds获取有改变的dataId 列表,
- 通过 getServerConfig 方法,根据 dataId 到服务端获取最新的配置信息,接着将最新的配置信息保存到 CacheData 中。
CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
- 当服务端配置信息发生变更时,客户端将最新的数据获取下来之后,保存在了 CacheData 中,同时更新了该 CacheData 的 md5 值,所以当下次执行 checkListenerMd5 方法时,就会发现当前 listener 所持有的 md5 值已经和 CacheData 的 md5 值不一样了,也就意味着服务端的配置信息发生改变了,这时就需要将最新的数据通知给 Listener 的持有者。
checkListenerMd5 检查MD5值
void checkListenerMd5() {
Iterator var1 = this.listeners.iterator();
while(var1.hasNext()) {
ManagerListenerWrap wrap = (ManagerListenerWrap)var1.next();
if (!this.md5.equals(wrap.lastCallMd5)) {
this.safeNotifyListener(this.dataId, this.group, this.content, this.md5, wrap);
}
}
}
该方法会检查 CacheData 当前的 md5 与 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就执行一个安全的监听器的通知方法:safeNotifyListener,通知什么呢?我们可以大胆的猜一下,应该是通知 Listener 的使用者,该 Listener 所关注的配置信息已经发生改变了。
总结:
Nacos 服务端创建了相关的配置项后,客户端就可以进行监听了。
客户端是通过一个定时任务来检查自己监听的配置项的数据的,一旦服务端的数据发生变化时,客户端将会获取到最新的数据,并将最新的数据保存在一个 CacheData 对象中,然后会重新计算 CacheData 的 md5 属性的值,此时就会对该 CacheData 所绑定的 Listener 触发 receiveConfigInfo 回调。
考虑到服务端故障的问题,客户端将最新数据获取后会保存在本地的 snapshot 文件中,以后会优先从文件中获取配置信息的值。
网友评论