nacos动态配置
上一篇我们分析了nacos获取配置中心配置的源码,https://www.jianshu.com/p/b1e5155ed00a
本篇将来分析nacos如何动态更新我们在配置中心修改后的配置
- 动态更新服务地址
- 动态更新配置
nacos版本
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
bootstrap.yaml
spring:
application:
name: service1
profiles:
active: dev
cloud:
nacos:
config:
namespace: 14d29622-bf23-4d4a-b86d-cdedc18ff83b
file-extension: yaml
server-addr: 127.0.0.1:8848
ext-config[0]:
data-id: common-${spring.profiles.active}.yaml
refresh: true
一、动态更新服务地址(线程池轮询)
1、还是来到NacosConfigService这个类,这是nacos的一个重要的类,首先看一下构造方法
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
// 初始化命名空间
initNamespace(properties);
// 初始化http请求代理
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
// 启动一个线程定时更新服务地址列表
agent.start();
// 构造客户端worker
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
2、agent.start()方法,负责启动定时线程获取服务地址列表
public synchronized void start() throws NacosException {
if (isStarted || isFixed) {
return;
}
// 获取服务列表任务
GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
getServersTask.run();
try {
this.wait((i + 1) * 100L);
} catch (Exception e) {
LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
}
}
if (serverUrls.isEmpty()) {
LOGGER.error("[init-serverlist] fail to get NACOS-server serverlist! env: {}, url: {}", name,
addressServerUrl);
throw new NacosException(NacosException.SERVER_ERROR,
"fail to get NACOS-server serverlist! env:" + name + ", not connnect url:" + addressServerUrl);
}
// 每隔30s更新 底层是一个单线程的线程池
TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
isStarted = true;
}
GetServerListTask内部类
class GetServerListTask implements Runnable {
final String url;
GetServerListTask(String url) {
this.url = url;
}
@Override
public void run() {
/**
* get serverlist from nameserver
*/
try {
// 更新服务地址
updateIfChanged(getApacheServerList(url, name));
} catch (Exception e) {
LOGGER.error("[" + name + "][update-serverlist] failed to update serverlist from address server!",
e);
}
}
}
通过http访问服务端获取服务列表地址,然后再跟当前的服务地址比较,不一致时更新
private List<String> getApacheServerList(String url, String name) {
try {
HttpResult httpResult = HttpSimpleClient.httpGet(url, null, null, null, 3000);
if (HttpURLConnection.HTTP_OK == httpResult.code) {
if (DEFAULT_NAME.equals(name)) {
EnvUtil.setSelfEnv(httpResult.headers);
}
List<String> lines = IOUtils.readLines(new StringReader(httpResult.content));
List<String> result = new ArrayList<String>(lines.size());
for (String serverAddr : lines) {
if (org.apache.commons.lang3.StringUtils.isNotBlank(serverAddr)) {
String[] ipPort = serverAddr.trim().split(":");
String ip = ipPort[0].trim();
if (ipPort.length == 1) {
result.add(ip + ":" + ParamUtil.getDefaultServerPort());
} else {
result.add(serverAddr);
}
}
}
return result;
} else {
LOGGER.error("[check-serverlist] error. addressServerUrl: {}, code: {}", addressServerUrl,
httpResult.code);
return null;
}
} catch (IOException e) {
LOGGER.error("[check-serverlist] exception. url: " + url, e);
return null;
}
}
二、动态更新配置
动态更新配置的重点在两个类:NacosContextRefresher、ClientWorker
// 实现监听事件,Application.run()准备ok时调用onApplicationEvent注册nacos的监听器
public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware
ClientWorker则负责具体的动态配置处理逻辑,主要流程如下
- 新建了两个线程池,一个用于定时触发检查配置更新,间隔10ms。另一个用于检查配置更新的分组多线程处理。
- 查询服务端存在更新的配置文件
- 根据更新的文件,获取配置文件内容
- 通过监听器发布RefreshEvent交给spring处理
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
// 单线程的线程池
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 多线程线程池
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 定时执行checkConfigInfo()操作,间隔10ms
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
启动轮询任务
// 启动轮询任务
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
LongPollingRunnable的run方法,
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// 检查服务端是否存在更新的配置文件
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 获取存在更新的配置文件内容
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 存在变化的文件,通过监听器发布
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
重点看checkListenerMd5,回调NacosContextRefresher#registerNacosListener方法,
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
private void safeNotifyListener(final String dataId, final String group, final String content,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
@Override
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
// 此处回调发布新的配置
listener.receiveConfigInfo(contentTmp);
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
listener);
} catch (NacosException de) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
发布更新事件,完成配置的更新
private void registerNacosListener(final String group, final String dataId) {
Listener listener = (Listener)this.listenerMap.computeIfAbsent(dataId, (i) -> {
return new Listener() {
public void receiveConfigInfo(String configInfo) {
NacosContextRefresher.refreshCountIncrement();
String md5 = "";
if (!StringUtils.isEmpty(configInfo)) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md5 = (new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))).toString(16);
} catch (UnsupportedEncodingException | NoSuchAlgorithmException var4) {
NacosContextRefresher.log.warn("[Nacos] unable to get md5 for dataId: " + dataId, var4);
}
}
NacosContextRefresher.this.refreshHistory.add(dataId, md5);
// 发布更新事件
NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
if (NacosContextRefresher.log.isDebugEnabled()) {
NacosContextRefresher.log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);
}
}
public Executor getExecutor() {
return null;
}
};
});
try {
this.configService.addListener(dataId, group, listener);
} catch (NacosException var5) {
var5.printStackTrace();
}
}
网友评论