美文网首页
Nacos Config源码剖析

Nacos Config源码剖析

作者: 王侦 | 来源:发表于2023-03-06 23:18 被阅读0次

1.Nacos启动从数据库加载配置文件并存储到本地磁盘上

ExternalDumpService#init

  • Spring构建Bean的过程中会执行带有@PostConstruct的初始化方法
  • DumpService#dumpOperate
  • DumpService#dumpConfigInfo
  • DumpAllProcessor#process
  • long currentMaxId = persistService.findConfigMaxId();
    Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
    从数据库查询配置。
  • ConfigCacheService.dump(),这里会将配置保存到磁盘文件,如果配置发生变化,会更新md5并且发布LocalDataChangeEvent,告诉客户端配置发生了变更,让客户的立即来拉取新的配置。
  • DiskUtil.saveToDisk(dataId, group, tenant, content);写入磁盘文件
  • updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);更新CacheItem中的md5
  • 如果md5不同,就要发布事件NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
  • 订阅者是RpcConfigChangeNotifier
  • RpcConfigChangeNotifier#onEvent
  • RpcConfigChangeNotifier#configDataChanged
  • RpcConfigChangeNotifier#push(RpcPushTask),first time :delay 0s; sencond time:delay 2s ;third time :delay 4s、
  • RpcConfigChangeNotifier.RpcPushTask#run
  • RpcPushService.pushWithCallback,向客户端推送的是ConfigChangeNotifyRequest

客户端的处理

  • 见下面1.1推拉模型

1.1 推拉模型

ConfigChangeNotifyRequest客户端处理的handler:

  • 客户端ClientWorker.ConfigRpcTransportClient#initRpcClientHandler中有相应的handler,处理ConfigChangeNotifyRequest。
  • ClientWorker.ConfigRpcTransportClient#notifyListenConfig
  • listenExecutebell.offer(bellItem);

listenExecutebell阻塞队列的处理:

  • ClientWorker.ConfigRpcTransportClient#startInternal从队列listenExecutebell.poll()取任务,这个任务取完直接丢弃不处理。然后执行executeConfigListen()。
  • 推拉模型:默认是5秒拉取一次executeConfigListen(),如果有任务(这个任务唯一的作用就是让客户端解除阻塞,任务不用处理),立即从服务端拉取executeConfigListen()。
  • executeConfigListen()在下面1.2详细介绍

流程是:
NacosConfigService构造方法
-> ClientWorker构造方法
-> agent.start()
-> ClientWorker.ConfigRpcTransportClient#startInternal

1.2 客户端定时拉取配置信息

ClientWorker.ConfigRpcTransportClient#executeConfigListen

  • ClientWorker#refreshContentAndCheck
  • ClientWorker#getServerConfig
  • ClientWorker.ConfigRpcTransportClient#queryConfig,发送的是ConfigQueryRequest
  • 服务端返回响应后
  • LocalConfigInfoProcessor.saveSnapshot保存结果到本地磁盘文件
  • 回到ClientWorker#refreshContentAndCheck,拿到配置后,对有变化的配置调用对应的监听器去处理
  • CacheData.checkListenerMd5();
  • CacheData#safeNotifyListener
  • listener.receiveConfigInfo(contentTmp);
  • NacosContextRefresher中匿名内部类AbstractSharedListener#receiveConfigInfo
  • 匿名内部类的innerReceive
  • 发布RefreshEvent事件
  • RefreshEventListener#onApplicationEvent
  • RefreshEventListener#handle
  • ContextRefresher#refresh
    1)ContextRefresher#refreshEnvironment,刷新环境,更新内存中的配置信息;
    2)RefreshScope#refreshAll,销毁@RefreshScope注解的bean实例,下次调用会重新获取一个新的实例(该实例使用新的配置)

服务端的处理,注意服务端不是去查mysql,而是去查nacos本地磁盘文件,所以直接修改mysql配置是不会触发客户端配置变更的。

  • ConfigQueryRequestHandler#handle
  • ConfigQueryRequestHandler#getContext
  • file = DiskUtil.targetFile()
  • content = readFileContent(file);

1.3 服务启动添加对配置的监听器

SpringBootApplication#run

  • refreshContext(context)
  • listeners.running(context)发布ApplicationReadyEvent事件
  • NacosConfigAutoConfiguration注入了NacosContextRefresher
  • NacosContextRefresher#onApplicationEvent处理ApplicationReadyEvent
  • registerNacosListener(propertySource.getGroup(), dataId);添加的是匿名的AbstractSharedListener

NacosContextRefresher#registerNacosListener

    private void registerNacosListener(final String groupKey, final String dataKey) {
        String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
        Listener listener = listenerMap.computeIfAbsent(key,
                lst -> new AbstractSharedListener() {
                    @Override
                    public void innerReceive(String dataId, String group,
                            String configInfo) {
                        refreshCountIncrement();
                        nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
                        applicationContext.publishEvent(
                                new RefreshEvent(this, null, "Refresh Nacos config"));
                        if (log.isDebugEnabled()) {
                            log.debug(String.format(
                                    "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
                                    group, dataId, configInfo));
                        }
                    }
                });
        try {
            configService.addListener(dataKey, groupKey, listener);
        }

1.4 事件驱动模型NotifyCenter

NotifyCenter#publishEvent

  • EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  • publisher.publish(event);
  • DefaultPublisher#publish
  • 核心是将事件放到阻塞队列queue中,如果放不下,则线程自己处理该事件receiveEvent(),主要是通知订阅者处理事件

事件处理:

  • DefaultPublisher#run
  • DefaultPublisher#openEventHandler
  • final Event event = queue.take();从队列拿事件
  • receiveEvent(event);处理事件
  • subscriber.onEvent(event)订阅者处理事件

事件驱动模型和事件监听(ApplicationListener遵循JDK规范)有啥区别?

  • NotifyCenter有阻塞队列
  • 添加订阅者(ApplicationListener事件监听不是绑定在发布者上的,是由Spring容器发布的,是个multicaster;事件驱动NotifyCenter事件监听是绑定在发布者上面的)

2.用户通过nacos-console控制台修改配置,并发布

ConfigController#publishConfig

  • persistService.insertOrUpdate()
  • ExternalStoragePersistServiceImpl#insertOrUpdate
  • ExternalStoragePersistServiceImpl#addConfigInfo 持久化信息到mysql
  • ConfigChangePublisher.notifyConfigChange(ConfigDataChangeEvent)
  • 订阅者在AsyncNotifyService构造方法里面定义了
  • AsyncNotifyService里面定义的匿名Subscriber#onEvent
  • rpcQueue.add(NotifySingleRpcTask)
  • ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
  • AsyncNotifyService.AsyncRpcTask#run
  • 1)如果是当前结点,dumpService.dump()
    dumpTaskMgr.addTask(DumpTask);
    NacosDelayTaskExecuteEngine#processTasks();
    DumpProcessor#process;
    ConfigCacheService.dump();同Nacos启动时一样,这里会将配置保存到磁盘文件,如果配置发生变化,会更新md5并且发布LocalDataChangeEvent,告诉客户端配置发生了变更,让客户端立即来拉取新的配置。
    LocalDataChangeEvent订阅者是RpcConfigChangeNotifier。
  • 2)如果是其他节点,configClusterRpcClientProxy.syncConfigChange(),如果有其他节点,通知其他节点配置发生变更

问题:
1)变更配置,只会通知nacos集群一个服务器?
2)拉取配置,也只会找nacos集群任意一台吗?

2.1 任务执行引擎

NacosTask
NacosTaskProcessor
NacosTaskExecuteEngine(管理processor和task)

3.应用启动期间从Nacos Config拉取配置

在创建子容器SpringBoot容器prepareContext()阶段,调用PropertySourceBootstrapConfiguration#Initialize()会加载配置并合并到CompositePropertySource。

NacosPropertySourceLocator#locate会从Nacos Config配置中心拉取配置。

具体的流程:

  • SpringApplication#run
  • prepareContext
  • PropertySourceBootstrapConfiguration#initialize
  • PropertySourceLocator#locateCollection
  • NacosPropertySourceLocator#locate
  • ConfigService configService = nacosConfigManager.getConfigService();如果没有就会创建ConfigService
  • loadSharedConfiguration(composite);加载共享的配置文件,对应配置spring.cloud.nacos.config.sharedConfigs
  • loadExtConfiguration(composite);加载扩展的配置文件,对应配置spring.cloud.nacos.config.extensionConfigs
  • loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);加载当前应用配置
    nacos-config(微服务名)
    nacos-config.yaml(微服务名.扩展名)
    nacos-config-dev.yaml(微服务名-profile.扩展名)
  • 走的都是NacosPropertySourceLocator#loadNacosPropertySource
  • NacosPropertySourceBuilder#loadNacosData
  • configService.getConfig(dataId, group, timeout);
  • NacosConfigService#getConfigInner
  • LocalConfigInfoProcessor.getFailover()优先使用本地配置
  • ClientWorker#getServerConfig如果没有获取到本地文件配置,则调用远程配置中心获取配置信息。配置中心也是从配置文件获取,客户端拿到配置信息,会将配置信息保存到本地文件。

4.动态配置@RefreshScope

4.1 @RefreshScope相关Bean创建

启动流程:

  • SpringApplication#run()
  • SpringApplication#refreshContext
  • SpringApplication#refresh
  • ServletWebServerApplicationContext#refresh
  • AbstractApplicationContext#refresh
  • AbstractApplicationContext#finishRefresh
  • publishEvent(new ContextRefreshedEvent(this));
  • RefreshScope监听器处理,是个ApplicationListener<ContextRefreshedEvent>
  • RefreshScope#onApplicationEvent
  • RefreshScope#start
  • 遍历容器找出BeanDefinition的scope为refresh的BD(在实例化单例非懒加载的bean时不会处理),然后调用Object bean = this.context.getBean(name);这里context是AnnotationConfigServletWebServerApplicationContext
  • AbstractApplicationContext#getBean()
  • AbstractBeanFactory#getBean
  • AbstractBeanFactory#doGetBean
  • 走的是最后一个分支逻辑,scope为RefreshScope,也就是上面的监听器
  • GenericScope#get
  • GenericScope.BeanLifecycleWrapper#getBean
  • this.bean = this.objectFactory.getObject(); 这里对RefreshScope的cache里面的bean赋值了
  • AbstractAutowireCapableBeanFactory#createBean,这里是核心

AbstractBeanFactory#doGetBean

                if (mbd.isSingleton()) {
                    sharedInstance = getSingleton(beanName, () -> {
                        try {
                            return createBean(beanName, mbd, args);
                        }
                        catch (BeansException ex) {
                            // Explicitly remove instance from singleton cache: It might have been put there
                            // eagerly by the creation process, to allow for circular reference resolution.
                            // Also remove any beans that received a temporary reference to the bean.
                            destroySingleton(beanName);
                            throw ex;
                        }
                    });
                    bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
                }

                else if (mbd.isPrototype()) {
                    // It's a prototype -> create a new instance.
                    Object prototypeInstance = null;
                    try {
                        beforePrototypeCreation(beanName);
                        prototypeInstance = createBean(beanName, mbd, args);
                    }
                    finally {
                        afterPrototypeCreation(beanName);
                    }
                    bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
                }

                else {
                    String scopeName = mbd.getScope();
                    if (!StringUtils.hasLength(scopeName)) {
                        throw new IllegalStateException("No scope name defined for bean ´" + beanName + "'");
                    }
                    Scope scope = this.scopes.get(scopeName);
                    if (scope == null) {
                        throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
                    }
                    try {
                        Object scopedInstance = scope.get(beanName, () -> {
                            beforePrototypeCreation(beanName);
                            try {
                                return createBean(beanName, mbd, args);
                            }
                            finally {
                                afterPrototypeCreation(beanName);
                            }
                        });
                        bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
                    }

@RefreshScope注解的bean存在哪里?是个什么形式?

存在RefreshScope的cache中。

@RestController
@RefreshScope  //动态感知修改后的值
public class TestController implements ApplicationListener<RefreshScopeRefreshedEvent>{

调用@RefreshScope注解的bean,是从哪里获取?

  • SpringMVC调用过来的bean在Spring容器的单例池里面也有testController,是GenericScope.LockedScopedProxyFactoryBean。
  • CglibAopProxy.DynamicAdvisedInterceptor#intercept
    target = targetSource.getTarget();
    SimpleBeanTargetSource#getTarget;
    getBeanFactory().getBean(getTargetBeanName());
    GenericScope#get
    GenericScope.BeanLifecycleWrapper#getBean这里会获取到cache中的bean;
  • new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
  • 这里拦截器有两个

@RefreshScope的bean销毁后,重新获取,是在哪里触发的?

  • 跟上面一样,SpringMVC调过来的时候,会首先获取target
  • getBeanFactory().getBean(getTargetBeanName());从容器中获取时,就会重新生成bean,然后放到cache中。

在哪里创建RefreshScope动态代理对象?

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RefreshScope.class)
@ConditionalOnProperty(name = RefreshAutoConfiguration.REFRESH_SCOPE_ENABLED,
        matchIfMissing = true)
@AutoConfigureBefore(HibernateJpaAutoConfiguration.class)
public class RefreshAutoConfiguration {

ConfigurationClassPostProcessor#postProcessBeanDefinitionRegistry

  • ClassPathBeanDefinitionScanner#doScan
  • AnnotationConfigUtils#applyScopedProxyMode处理scopedProxyMode不为ScopedProxyMode.NO的BeanDefinition,@RefreshScope的scopedProxyMode是ScopedProxyMode.TARGET_CLASS
  • ScopedProxyUtils#createScopedProxy
    1)注册了scopedTarget.testController,代表真正的TestController信息;
    2)创建了proxyDefinition,名字为testController,特别注意代理对象的类是ScopedProxyFactoryBean。
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Scope("refresh")
@Documented
public @interface RefreshScope {

    /**
     * @see Scope#proxyMode()
     * @return proxy mode
     */
    ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;

}

ScopedProxyUtils#createScopedProxy

    public static BeanDefinitionHolder createScopedProxy(BeanDefinitionHolder definition,
            BeanDefinitionRegistry registry, boolean proxyTargetClass) {

        String originalBeanName = definition.getBeanName();
        BeanDefinition targetDefinition = definition.getBeanDefinition();
        String targetBeanName = getTargetBeanName(originalBeanName);

        // Create a scoped proxy definition for the original bean name,
        // "hiding" the target bean in an internal target definition.
        RootBeanDefinition proxyDefinition = new RootBeanDefinition(ScopedProxyFactoryBean.class);
        proxyDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, targetBeanName));
        proxyDefinition.setOriginatingBeanDefinition(targetDefinition);
        proxyDefinition.setSource(definition.getSource());
        proxyDefinition.setRole(targetDefinition.getRole());

        proxyDefinition.getPropertyValues().add("targetBeanName", targetBeanName);
        if (proxyTargetClass) {
            targetDefinition.setAttribute(AutoProxyUtils.PRESERVE_TARGET_CLASS_ATTRIBUTE, Boolean.TRUE);
            // ScopedProxyFactoryBean's "proxyTargetClass" default is TRUE, so we don't need to set it explicitly here.
        }
        else {
            proxyDefinition.getPropertyValues().add("proxyTargetClass", Boolean.FALSE);
        }

        // Copy autowire settings from original bean definition.
        proxyDefinition.setAutowireCandidate(targetDefinition.isAutowireCandidate());
        proxyDefinition.setPrimary(targetDefinition.isPrimary());
        if (targetDefinition instanceof AbstractBeanDefinition) {
            proxyDefinition.copyQualifiersFrom((AbstractBeanDefinition) targetDefinition);
        }

        // The target bean should be ignored in favor of the scoped proxy.
        targetDefinition.setAutowireCandidate(false);
        targetDefinition.setPrimary(false);

        // Register the target bean as separate bean in the factory.
        registry.registerBeanDefinition(targetBeanName, targetDefinition);

        // Return the scoped proxy definition as primary bean definition
        // (potentially an inner bean).
        return new BeanDefinitionHolder(proxyDefinition, originalBeanName, definition.getAliases());
    }

4.2 @RefreshScope刷新配置和销毁动态配置Bean

在1.2中ContextRefresher#refresh 刷新配置

  • ContextRefresher#refreshEnvironment
  • extract(this.context.getEnvironment().getPropertySources());抽取除了SYSTEM、SERVLET等之外的配置信息
  • ContextRefresher#addConfigFilesToEnvironment,把原来的environment里面的参数放到一个新建的Spring容器里重新加载,最后会关闭该容器,获取到参数的新值
  • changes()找到改变的参数值
  • 发布EnvironmentChangeEvent事件,带上改变的参数值

在1.2中ContextRefresher#refresh 销毁@RefreshScope注解的bean

  • RefreshScope#refreshAll
  • GenericScope#destroy(),清除Scope里面的缓存this.cache.clear();下次就会重新获取一个新的实例(该实例使用新的配置)
  • 发布事件RefreshScopeRefreshedEvent

4.3 定时器失效的问题

详细分析参考:https://www.jianshu.com/p/9760ff3bb311?v=1678273650499

ScheduledAnnotationBeanPostProcessor

@RefreshScope,如果配置更新后,销毁@RefreshScope注解的bean,此时定时器会失效。必须要重新请求一下定时器所在类的路径,定时器才能重新生效。

怎样解决?
加一个监听器,监听容器启动完成事件,然后在监听器里面调用监听器。

@RestController
@RefreshScope  //动态感知修改后的值
public class TestController implements ApplicationListener<RefreshScopeRefreshedEvent>{

    @Value("${common.age}")
     String age;
    @Value("${common.name}")
     String name;

    @GetMapping("/common")
    public String hello() {
        return name+","+age;
    }

    //触发@RefreshScope执行逻辑会导致@Scheduled定时任务失效
    @Scheduled(cron = "*/3 * * * * ?")  //定时任务每隔3s执行一次
    public void execute() {
        System.out.println("定时任务正常执行。。。。。。");
    }

    @Override
    public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
        this.execute();
    }

相关文章

网友评论

      本文标题:Nacos Config源码剖析

      本文链接:https://www.haomeiwen.com/subject/frppldtx.html