1.Nacos启动从数据库加载配置文件并存储到本地磁盘上
ExternalDumpService#init
- Spring构建Bean的过程中会执行带有@PostConstruct的初始化方法
- DumpService#dumpOperate
- DumpService#dumpConfigInfo
- DumpAllProcessor#process
- long currentMaxId = persistService.findConfigMaxId();
Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
从数据库查询配置。 - ConfigCacheService.dump(),这里会将配置保存到磁盘文件,如果配置发生变化,会更新md5并且发布LocalDataChangeEvent,告诉客户端配置发生了变更,让客户的立即来拉取新的配置。
- DiskUtil.saveToDisk(dataId, group, tenant, content);写入磁盘文件
- updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);更新CacheItem中的md5
- 如果md5不同,就要发布事件NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
- 订阅者是RpcConfigChangeNotifier
- RpcConfigChangeNotifier#onEvent
- RpcConfigChangeNotifier#configDataChanged
- RpcConfigChangeNotifier#push(RpcPushTask),first time :delay 0s; sencond time:delay 2s ;third time :delay 4s、
- RpcConfigChangeNotifier.RpcPushTask#run
- RpcPushService.pushWithCallback,向客户端推送的是ConfigChangeNotifyRequest
客户端的处理
- 见下面1.1推拉模型
1.1 推拉模型
ConfigChangeNotifyRequest客户端处理的handler:
- 客户端ClientWorker.ConfigRpcTransportClient#initRpcClientHandler中有相应的handler,处理ConfigChangeNotifyRequest。
- ClientWorker.ConfigRpcTransportClient#notifyListenConfig
- listenExecutebell.offer(bellItem);
listenExecutebell阻塞队列的处理:
- ClientWorker.ConfigRpcTransportClient#startInternal从队列listenExecutebell.poll()取任务,这个任务取完直接丢弃不处理。然后执行executeConfigListen()。
- 推拉模型:默认是5秒拉取一次executeConfigListen(),如果有任务(这个任务唯一的作用就是让客户端解除阻塞,任务不用处理),立即从服务端拉取executeConfigListen()。
- executeConfigListen()在下面1.2详细介绍
流程是:
NacosConfigService构造方法
-> ClientWorker构造方法
-> agent.start()
-> ClientWorker.ConfigRpcTransportClient#startInternal
1.2 客户端定时拉取配置信息
ClientWorker.ConfigRpcTransportClient#executeConfigListen
- ClientWorker#refreshContentAndCheck
- ClientWorker#getServerConfig
- ClientWorker.ConfigRpcTransportClient#queryConfig,发送的是ConfigQueryRequest
- 服务端返回响应后
- LocalConfigInfoProcessor.saveSnapshot保存结果到本地磁盘文件
- 回到ClientWorker#refreshContentAndCheck,拿到配置后,对有变化的配置调用对应的监听器去处理
- CacheData.checkListenerMd5();
- CacheData#safeNotifyListener
- listener.receiveConfigInfo(contentTmp);
- NacosContextRefresher中匿名内部类AbstractSharedListener#receiveConfigInfo
- 匿名内部类的innerReceive
- 发布RefreshEvent事件
- RefreshEventListener#onApplicationEvent
- RefreshEventListener#handle
- ContextRefresher#refresh
1)ContextRefresher#refreshEnvironment,刷新环境,更新内存中的配置信息;
2)RefreshScope#refreshAll,销毁@RefreshScope注解的bean实例,下次调用会重新获取一个新的实例(该实例使用新的配置)
服务端的处理,注意服务端不是去查mysql,而是去查nacos本地磁盘文件,所以直接修改mysql配置是不会触发客户端配置变更的。
- ConfigQueryRequestHandler#handle
- ConfigQueryRequestHandler#getContext
- file = DiskUtil.targetFile()
- content = readFileContent(file);
1.3 服务启动添加对配置的监听器
SpringBootApplication#run
- refreshContext(context)
- listeners.running(context)发布ApplicationReadyEvent事件
- NacosConfigAutoConfiguration注入了NacosContextRefresher
- NacosContextRefresher#onApplicationEvent处理ApplicationReadyEvent
- registerNacosListener(propertySource.getGroup(), dataId);添加的是匿名的AbstractSharedListener
NacosContextRefresher#registerNacosListener
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
configService.addListener(dataKey, groupKey, listener);
}
1.4 事件驱动模型NotifyCenter
NotifyCenter#publishEvent
- EventPublisher publisher = INSTANCE.publisherMap.get(topic);
- publisher.publish(event);
- DefaultPublisher#publish
- 核心是将事件放到阻塞队列queue中,如果放不下,则线程自己处理该事件receiveEvent(),主要是通知订阅者处理事件
事件处理:
- DefaultPublisher#run
- DefaultPublisher#openEventHandler
- final Event event = queue.take();从队列拿事件
- receiveEvent(event);处理事件
- subscriber.onEvent(event)订阅者处理事件
事件驱动模型和事件监听(ApplicationListener遵循JDK规范)有啥区别?
- NotifyCenter有阻塞队列
- 添加订阅者(ApplicationListener事件监听不是绑定在发布者上的,是由Spring容器发布的,是个multicaster;事件驱动NotifyCenter事件监听是绑定在发布者上面的)
2.用户通过nacos-console控制台修改配置,并发布
ConfigController#publishConfig
- persistService.insertOrUpdate()
- ExternalStoragePersistServiceImpl#insertOrUpdate
- ExternalStoragePersistServiceImpl#addConfigInfo 持久化信息到mysql
- ConfigChangePublisher.notifyConfigChange(ConfigDataChangeEvent)
- 订阅者在AsyncNotifyService构造方法里面定义了
- AsyncNotifyService里面定义的匿名Subscriber#onEvent
- rpcQueue.add(NotifySingleRpcTask)
- ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
- AsyncNotifyService.AsyncRpcTask#run
- 1)如果是当前结点,dumpService.dump()
dumpTaskMgr.addTask(DumpTask);
NacosDelayTaskExecuteEngine#processTasks();
DumpProcessor#process;
ConfigCacheService.dump();同Nacos启动时一样,这里会将配置保存到磁盘文件,如果配置发生变化,会更新md5并且发布LocalDataChangeEvent,告诉客户端配置发生了变更,让客户端立即来拉取新的配置。
LocalDataChangeEvent订阅者是RpcConfigChangeNotifier。 - 2)如果是其他节点,configClusterRpcClientProxy.syncConfigChange(),如果有其他节点,通知其他节点配置发生变更
问题:
1)变更配置,只会通知nacos集群一个服务器?
2)拉取配置,也只会找nacos集群任意一台吗?
2.1 任务执行引擎
NacosTask
NacosTaskProcessor
NacosTaskExecuteEngine(管理processor和task)
3.应用启动期间从Nacos Config拉取配置
在创建子容器SpringBoot容器prepareContext()阶段,调用PropertySourceBootstrapConfiguration#Initialize()会加载配置并合并到CompositePropertySource。
NacosPropertySourceLocator#locate会从Nacos Config配置中心拉取配置。
具体的流程:
- SpringApplication#run
- prepareContext
- PropertySourceBootstrapConfiguration#initialize
- PropertySourceLocator#locateCollection
- NacosPropertySourceLocator#locate
- ConfigService configService = nacosConfigManager.getConfigService();如果没有就会创建ConfigService
- loadSharedConfiguration(composite);加载共享的配置文件,对应配置spring.cloud.nacos.config.sharedConfigs
- loadExtConfiguration(composite);加载扩展的配置文件,对应配置spring.cloud.nacos.config.extensionConfigs
- loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);加载当前应用配置
nacos-config(微服务名)
nacos-config.yaml(微服务名.扩展名)
nacos-config-dev.yaml(微服务名-profile.扩展名) - 走的都是NacosPropertySourceLocator#loadNacosPropertySource
- NacosPropertySourceBuilder#loadNacosData
- configService.getConfig(dataId, group, timeout);
- NacosConfigService#getConfigInner
- LocalConfigInfoProcessor.getFailover()优先使用本地配置
- ClientWorker#getServerConfig如果没有获取到本地文件配置,则调用远程配置中心获取配置信息。配置中心也是从配置文件获取,客户端拿到配置信息,会将配置信息保存到本地文件。
4.动态配置@RefreshScope
4.1 @RefreshScope相关Bean创建
启动流程:
- SpringApplication#run()
- SpringApplication#refreshContext
- SpringApplication#refresh
- ServletWebServerApplicationContext#refresh
- AbstractApplicationContext#refresh
- AbstractApplicationContext#finishRefresh
- publishEvent(new ContextRefreshedEvent(this));
- RefreshScope监听器处理,是个ApplicationListener<ContextRefreshedEvent>
- RefreshScope#onApplicationEvent
- RefreshScope#start
- 遍历容器找出BeanDefinition的scope为refresh的BD(在实例化单例非懒加载的bean时不会处理),然后调用Object bean = this.context.getBean(name);这里context是AnnotationConfigServletWebServerApplicationContext
- AbstractApplicationContext#getBean()
- AbstractBeanFactory#getBean
- AbstractBeanFactory#doGetBean
- 走的是最后一个分支逻辑,scope为RefreshScope,也就是上面的监听器
- GenericScope#get
- GenericScope.BeanLifecycleWrapper#getBean
- this.bean = this.objectFactory.getObject(); 这里对RefreshScope的cache里面的bean赋值了
- AbstractAutowireCapableBeanFactory#createBean,这里是核心
AbstractBeanFactory#doGetBean
if (mbd.isSingleton()) {
sharedInstance = getSingleton(beanName, () -> {
try {
return createBean(beanName, mbd, args);
}
catch (BeansException ex) {
// Explicitly remove instance from singleton cache: It might have been put there
// eagerly by the creation process, to allow for circular reference resolution.
// Also remove any beans that received a temporary reference to the bean.
destroySingleton(beanName);
throw ex;
}
});
bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
}
else if (mbd.isPrototype()) {
// It's a prototype -> create a new instance.
Object prototypeInstance = null;
try {
beforePrototypeCreation(beanName);
prototypeInstance = createBean(beanName, mbd, args);
}
finally {
afterPrototypeCreation(beanName);
}
bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
}
else {
String scopeName = mbd.getScope();
if (!StringUtils.hasLength(scopeName)) {
throw new IllegalStateException("No scope name defined for bean ´" + beanName + "'");
}
Scope scope = this.scopes.get(scopeName);
if (scope == null) {
throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
}
try {
Object scopedInstance = scope.get(beanName, () -> {
beforePrototypeCreation(beanName);
try {
return createBean(beanName, mbd, args);
}
finally {
afterPrototypeCreation(beanName);
}
});
bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
}
@RefreshScope注解的bean存在哪里?是个什么形式?
存在RefreshScope的cache中。
@RestController
@RefreshScope //动态感知修改后的值
public class TestController implements ApplicationListener<RefreshScopeRefreshedEvent>{
![](https://img.haomeiwen.com/i4222138/ccc5bbf7d0f75440.png)
![](https://img.haomeiwen.com/i4222138/924252c06e5bf5ff.png)
![](https://img.haomeiwen.com/i4222138/8d9f6600b6d94770.png)
![](https://img.haomeiwen.com/i4222138/307d756a9da80f03.png)
调用@RefreshScope注解的bean,是从哪里获取?
- SpringMVC调用过来的bean在Spring容器的单例池里面也有testController,是GenericScope.LockedScopedProxyFactoryBean。
- CglibAopProxy.DynamicAdvisedInterceptor#intercept
target = targetSource.getTarget();
SimpleBeanTargetSource#getTarget;
getBeanFactory().getBean(getTargetBeanName());
GenericScope#get
GenericScope.BeanLifecycleWrapper#getBean这里会获取到cache中的bean; - new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
- 这里拦截器有两个
![](https://img.haomeiwen.com/i4222138/fe2945641735bca8.png)
![](https://img.haomeiwen.com/i4222138/2d62e0baf58080e3.png)
@RefreshScope的bean销毁后,重新获取,是在哪里触发的?
- 跟上面一样,SpringMVC调过来的时候,会首先获取target
- getBeanFactory().getBean(getTargetBeanName());从容器中获取时,就会重新生成bean,然后放到cache中。
在哪里创建RefreshScope动态代理对象?
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RefreshScope.class)
@ConditionalOnProperty(name = RefreshAutoConfiguration.REFRESH_SCOPE_ENABLED,
matchIfMissing = true)
@AutoConfigureBefore(HibernateJpaAutoConfiguration.class)
public class RefreshAutoConfiguration {
ConfigurationClassPostProcessor#postProcessBeanDefinitionRegistry
- ClassPathBeanDefinitionScanner#doScan
- AnnotationConfigUtils#applyScopedProxyMode处理scopedProxyMode不为ScopedProxyMode.NO的BeanDefinition,@RefreshScope的scopedProxyMode是ScopedProxyMode.TARGET_CLASS
- ScopedProxyUtils#createScopedProxy
1)注册了scopedTarget.testController,代表真正的TestController信息;
2)创建了proxyDefinition,名字为testController,特别注意代理对象的类是ScopedProxyFactoryBean。
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Scope("refresh")
@Documented
public @interface RefreshScope {
/**
* @see Scope#proxyMode()
* @return proxy mode
*/
ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;
}
ScopedProxyUtils#createScopedProxy
public static BeanDefinitionHolder createScopedProxy(BeanDefinitionHolder definition,
BeanDefinitionRegistry registry, boolean proxyTargetClass) {
String originalBeanName = definition.getBeanName();
BeanDefinition targetDefinition = definition.getBeanDefinition();
String targetBeanName = getTargetBeanName(originalBeanName);
// Create a scoped proxy definition for the original bean name,
// "hiding" the target bean in an internal target definition.
RootBeanDefinition proxyDefinition = new RootBeanDefinition(ScopedProxyFactoryBean.class);
proxyDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, targetBeanName));
proxyDefinition.setOriginatingBeanDefinition(targetDefinition);
proxyDefinition.setSource(definition.getSource());
proxyDefinition.setRole(targetDefinition.getRole());
proxyDefinition.getPropertyValues().add("targetBeanName", targetBeanName);
if (proxyTargetClass) {
targetDefinition.setAttribute(AutoProxyUtils.PRESERVE_TARGET_CLASS_ATTRIBUTE, Boolean.TRUE);
// ScopedProxyFactoryBean's "proxyTargetClass" default is TRUE, so we don't need to set it explicitly here.
}
else {
proxyDefinition.getPropertyValues().add("proxyTargetClass", Boolean.FALSE);
}
// Copy autowire settings from original bean definition.
proxyDefinition.setAutowireCandidate(targetDefinition.isAutowireCandidate());
proxyDefinition.setPrimary(targetDefinition.isPrimary());
if (targetDefinition instanceof AbstractBeanDefinition) {
proxyDefinition.copyQualifiersFrom((AbstractBeanDefinition) targetDefinition);
}
// The target bean should be ignored in favor of the scoped proxy.
targetDefinition.setAutowireCandidate(false);
targetDefinition.setPrimary(false);
// Register the target bean as separate bean in the factory.
registry.registerBeanDefinition(targetBeanName, targetDefinition);
// Return the scoped proxy definition as primary bean definition
// (potentially an inner bean).
return new BeanDefinitionHolder(proxyDefinition, originalBeanName, definition.getAliases());
}
4.2 @RefreshScope刷新配置和销毁动态配置Bean
在1.2中ContextRefresher#refresh 刷新配置
- ContextRefresher#refreshEnvironment
- extract(this.context.getEnvironment().getPropertySources());抽取除了SYSTEM、SERVLET等之外的配置信息
- ContextRefresher#addConfigFilesToEnvironment,把原来的environment里面的参数放到一个新建的Spring容器里重新加载,最后会关闭该容器,获取到参数的新值
- changes()找到改变的参数值
- 发布EnvironmentChangeEvent事件,带上改变的参数值
在1.2中ContextRefresher#refresh 销毁@RefreshScope注解的bean
- RefreshScope#refreshAll
- GenericScope#destroy(),清除Scope里面的缓存this.cache.clear();下次就会重新获取一个新的实例(该实例使用新的配置)
- 发布事件RefreshScopeRefreshedEvent
4.3 定时器失效的问题
详细分析参考:https://www.jianshu.com/p/9760ff3bb311?v=1678273650499
ScheduledAnnotationBeanPostProcessor
@RefreshScope,如果配置更新后,销毁@RefreshScope注解的bean,此时定时器会失效。必须要重新请求一下定时器所在类的路径,定时器才能重新生效。
怎样解决?
加一个监听器,监听容器启动完成事件,然后在监听器里面调用监听器。
@RestController
@RefreshScope //动态感知修改后的值
public class TestController implements ApplicationListener<RefreshScopeRefreshedEvent>{
@Value("${common.age}")
String age;
@Value("${common.name}")
String name;
@GetMapping("/common")
public String hello() {
return name+","+age;
}
//触发@RefreshScope执行逻辑会导致@Scheduled定时任务失效
@Scheduled(cron = "*/3 * * * * ?") //定时任务每隔3s执行一次
public void execute() {
System.out.println("定时任务正常执行。。。。。。");
}
@Override
public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
this.execute();
}
网友评论