美文网首页
Nacos源码剖析

Nacos源码剖析

作者: 王侦 | 来源:发表于2023-01-17 10:54 被阅读0次

    1.客户端向nacos服务端注册

    1.1 客户端注册的地方

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnNacosDiscoveryEnabled
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
            matchIfMissing = true)
    @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
            AutoServiceRegistrationAutoConfiguration.class,
            NacosDiscoveryAutoConfiguration.class })
    public class NacosServiceRegistryAutoConfiguration {
    
        @Bean
        public NacosServiceRegistry nacosServiceRegistry(
                NacosDiscoveryProperties nacosDiscoveryProperties) {
            return new NacosServiceRegistry(nacosDiscoveryProperties);
        }
    
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        public NacosRegistration nacosRegistration(
                ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
                NacosDiscoveryProperties nacosDiscoveryProperties,
                ApplicationContext context) {
            return new NacosRegistration(registrationCustomizers.getIfAvailable(),
                    nacosDiscoveryProperties, context);
        }
    
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        public NacosAutoServiceRegistration nacosAutoServiceRegistration(
                NacosServiceRegistry registry,
                AutoServiceRegistrationProperties autoServiceRegistrationProperties,
                NacosRegistration registration) {
            return new NacosAutoServiceRegistration(registry,
                    autoServiceRegistrationProperties, registration);
        }
    
    }
    

    NacosServiceRegistryAutoConfiguration注册了三个Bean:

    • NacosServiceRegistry,register()方法,deregister()方法,根据Registration获取实例Instance的方法
    • NacosRegistration,加了@PostConstruct注解的init()设置port、心跳间隔、心跳超时(BeanPostProcessor#postProcessBeforeInitialization()初始化前会调用@PostConstruct注解的init()方法进行设置)。可以设置和获取各种属性:serviceId、host、port、uri等等
    • NacosAutoServiceRegistration,包含上面两个Bean,是个ApplicationListener

    NacosAutoServiceRegistration是核心,其余两个Bean是为了拆分职能,为该类服务。


    NacosAutoServiceRegistration继承自ApplicationListener<WebServerInitializedEvent>,监听的是WebServerInitializedEvent事件。

    这个调用时机点:

    • refresh()的finishRefresh()
    • DefaultLifecycleProcessor#onRefresh
    • WebServerStartStopLifecycle#start发布ServletWebServerInitializedEvent事件

    然后就调用至NacosAutoServiceRegistration中:

    • AbstractAutoServiceRegistration#onApplicationEvent
    • bind(event)
    • start()
       发布InstancePreRegisteredEvent事件;
       register();
       发布InstanceRegisteredEvent事件;
       设置running为true。
    • 重点看register
      NacosAutoServiceRegistration#register
      AbstractAutoServiceRegistration#register
    • this.serviceRegistry.register(getRegistration());这里的serviceRegistry就是NacosServiceRegistry,getRegistration()返回的就是NacosRegistration。
    • 最终来到NacosServiceRegistry#register(Registration)
       获取serviceId、group,以及将registration转换成Instance;
       namingService.registerInstance(serviceId, group, instance);注册实例;
    • NacosNamingService#registerInstance()

    现在来重点看一下NacosNamingService#registerInstance:

    • 如果是临时实例,则beatReactor.addBeatInfo(groupedServiceName, beatInfo);这里添加了一个延时任务BeatTask,心跳的默认间隔是多少?在BeatReactor#buildBeatInfo有指定,是5s。
    • serverProxy.registerService(groupedServiceName, groupName, instance);实际就是NamingProxy#registerService。请求路径:/nacos/v1/ns/instance,POST请求

    1.2 服务端服务注册的地方

    /nacos/v1/ns/instance POST


    InstanceController#register

    • 从请求中提取出Instance
    • serviceManager.registerInstance(namespaceId, serviceName, instance);
        @CanDistro
        @PostMapping
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
        public String register(HttpServletRequest request) throws Exception {
            
            final String namespaceId = WebUtils
                    .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
            final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            NamingUtils.checkServiceNameFormat(serviceName);
            
            final Instance instance = parseInstance(request);
            
            serviceManager.registerInstance(namespaceId, serviceName, instance);
            return "ok";
        }
    

    ServiceManager#registerInstance

    • createEmptyService:创建空的Service,将Service放入Map<String, Map<String, Service>> serviceMap中;初始化:延时任务ClientBeatCheckTask;监听服务数据的变化consistencyService.listen()
    • addInstance()加锁操作
       addIpAddresses()将注册实例加入到对应服务Service中;
       consistencyService.put(key, instances)参见下面的DelegateConsistencyServiceImpl#put

    DelegateConsistencyServiceImpl#put

    • mapConsistencyService(key).put(key, value)
    • DistroConsistencyServiceImpl#put
       onPut(key, value);将注册实例更新到内存注册表,并发布服务变化事件,通过udp方式将服务变动通知给订阅的客户端。
       distroProtocol.sync()同步实例信息到nacos.server集群其他节点。
        public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
            
            createEmptyService(namespaceId, serviceName, instance.isEphemeral());
            
            Service service = getService(namespaceId, serviceName);
            
            if (service == null) {
                throw new NacosException(NacosException.INVALID_PARAM,
                        "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
            
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }
    

    1.3 (服务注册表发生变化时)服务端即时主动推送(UDP)

    ServiceManager#registerInstance
    -> ServiceManager#addInstance
    -> DistroConsistencyServiceImpl#put
    -> DistroConsistencyServiceImpl#onPut
    -> notifier.addTask(key, DataOperation.CHANGE);放到阻塞队列tasks中
    -> Notifier#run从tasks中取出并进行处理handle(pair)
    -> Service#onChange
    -> Service#updateIPs
     将注册实例更新到cluster的ephemeralInstances中。
     发布服务变更事件getPushService().serviceChanged(this)。
    -> PushService#serviceChanged 事件ServiceChangeEvent
    -> PushService#onApplicationEvent处理事件ServiceChangeEvent
     clientMap.get(namespaceId, serviceName)应该就是订阅该服务的客户端,然后通过udp方式将服务变动通知给订阅的客户端。

    问题1.服务是在哪里监听的?

    • ServiceManager#registerInstance
    • -> ServiceManager#createEmptyService
    • -> ServiceManager#createServiceIfAbsent
    • -> ServiceManager#putServiceAndInit
    consistencyService.listen(
      KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), 
      service);
    
    consistencyService.listen(
      KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), 
      service);
    

    问题2.服务是在哪里进行订阅的?
    InstanceController#list
    -> InstanceController#doSrvIpxt
    -> PushService#addClient(xxx)
    -> PushService#addClient(PushService.PushClient)

    问题3.梳理一下监听订阅机制?

    2.客户端拉取服务实例

    2.1 客户端

    Ribbon
    -> ZoneAwareLoadBalancer构造方法
    -> 父类DynamicServerListLoadBalancer()构造方法
    -> DynamicServerListLoadBalancer#restOfInit
     enableAndInitLearnNewServersFeature();
     updateListOfServers();
    -> NacosServerList#getUpdatedListOfServers
    -> NacosNamingService#selectInstances()
    -> HostReactor#getServiceInfo
    -> HostReactor#updateServiceNow
    -> NamingProxy#queryList

    /nacos/v1/ns/instance/list,GET操作

    2.2 服务端

    InstanceController#list
    -> InstanceController#doSrvIpxt
    1)pushService.addClient() 通过UDP方式推送变更到订阅的客户端
    2)service.srvIPs()获取所有持久化实例和临时实例

        @GetMapping("/list")
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
        public ObjectNode list(HttpServletRequest request) throws Exception {
    

    3.心跳机制

    3.1 客户端BeatReactor

    NacosAutoServiceRegistration父类AbstractAutoServiceRegistration#onApplicationEvent
    -> bind()
    -> start()
    -> NacosServiceRegistry#register
    -> NacosNamingService#registerInstance
    -> 临时实例 BeatReactor#addBeatInfo
    -> executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    -> BeatReactor.BeatTask#run
    1)serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);发送心跳
    2)executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);递归调用

    /nacos/v1/ns/instance/beat,PUT方法

        @CanDistro
        @PutMapping("/beat")
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
        public ObjectNode beat(HttpServletRequest request) throws Exception {
    

    InstanceController#beat
    1)如果实例不存在,则重新注册(网络不通后者重启导致实例下线后);
    2)service.processClientBeat(clientBeat);调用至ClientBeatProcessor#run,找到发送心跳的实例,然后设置实例的lastBeat属性:instance.setLastBeat(System.currentTimeMillis());也有可能更改实例的healthy属性(如果原来为不健康更改为健康,也要推送服务变更事件udp)

    3.2 服务端 HealthCheckReactor

    InstanceController#register
    -> ServiceManager#registerInstance
    -> ServiceManager#createEmptyService
    -> ServiceManager#putServiceAndInit
    -> Service#init
    -> HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    -> ClientBeatCheckTask#run
    1)getDistroMapper().responsible(service.getName()) 判断本服务器是否负责service的心跳检查工作;
    2)如果某个实例超过15s没有收到心跳,则将它的healthy属性设置为false;并推送服务变更getPushService().serviceChanged(service);
    3)如果某个实例超过30s没有收到心跳,则直接剔除该实例,deleteIp(instance);会发起一个http调用。

        public boolean responsible(String serviceName) {
            final List<String> servers = healthyList;
            
            if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
                return true;
            }
            
            if (CollectionUtils.isEmpty(servers)) {
                // means distro config is not ready yet
                return false;
            }
            
            int index = servers.indexOf(EnvUtil.getLocalAddress());
            int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
            if (lastIndex < 0 || index < 0) {
                return true;
            }
            
            int target = distroHash(serviceName) % servers.size();
            return target >= index && target <= lastIndex;
        }
    

    4.AP集群架构

    4.1 数据同步机制

    服务端注册、以及下线超时实例都会走数据同步机制。
    InstanceController#register
    -> ServiceManager#registerInstance
    -> addInstance()
    -> DelegateConsistencyServiceImpl#put
    -> DistroConsistencyServiceImpl#put
    -> DistroProtocol#sync()

    ClientBeatCheckTask#run
    -> deleteIp(instance)
    -> 服务端InstanceController#deregister
    -> ServiceManager#removeInstance()
    -> DelegateConsistencyServiceImpl#put
    -> DistroConsistencyServiceImpl#put
    -> DistroProtocol#sync()

        public void sync(DistroKey distroKey, DataOperation action, long delay) {
            for (Member each : memberManager.allMembersWithoutSelf()) {
                DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                        each.getAddress());
                DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
                distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
                }
            }
        }
    

    DistroProtocol#sync()
    -> 会给除了自己以外的所有服务器同步
    -> distroTaskEngineHolder.getDelayTaskExecuteEngine(). addTask(distroKeyWithTarget, distroDelayTask);核心就是tasks.put(key, newTask); 这里任务是DistroDelayTask
    ----异步处理----
    -> NacosDelayTaskExecuteEngine.ProcessRunnable#run
    -> processTasks()从tasks中获取任务并进行处理
    -> DistroDelayTaskProcessor#process
    -> distroTaskEngineHolder.getExecuteWorkersManager(). addTask(distroKey, syncChangeTask);
    -> 放到阻塞队列中queue.put(task);
    ----异步处理----
    -> TaskExecuteWorker.InnerWorker#run从queue.take()获取任务并执行task.run(),任务类型是DistroSyncChangeTask
    -> DistroSyncChangeTask#run
    1)distroComponentHolder.findTransportAgent(type). syncData(distroData, getDistroKey().getTargetServer());最终调用NamingProxy#syncData
    2)handleFailedTask();如果同步不成功,则重试

    同步方法NamingProxy#syncData:
    -> /nacos/v1/ns/distro/datum,PUT方法
    -> 其他server处理方法DistroController#onSyncDatum
    -> DistroProtocol#onReceive
    -> DistroConsistencyServiceImpl#processData()
    -> DistroConsistencyServiceImpl#onPut 将同步过来的实例更新到内存注册表(这里怎么避免再次对同步过来的数据进行UDP推送?

    4.2 心跳只会在一台机器上检查

    HealthCheckReactor
    ClientBeatCheckTask
    如果某台机器挂了,其集群机器总数会变化,服务名hash取模就会跟着变化

    客户端会连接到集群中所有机器码?

    4.3 集群状态信息同步

    4.3.1 ServiceReporter

    健康状态通过ServiceReporter
    -> ServiceManager#init
    -> GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
    -> ServiceManager.ServiceReporter#run,计算所有namespaceId中所有serviceName的校验和,然后发送给其他server
    -> synchronizer.send(server.getAddress(), msg);
    -> ServiceStatusSynchronizer#send

    /nacos/v1/ns/service/status
    其他server处理ServiceController#serviceStatus,如果发现校验和不一样,会向源Server发起请求,更新服务实例的健康状况。

    4.3.2 ServerStatusReporter

    ServerListManager#init
    -> GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);

    4.3.3 ServerInfoUpdater

    ServerListManager#init
    -> GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());

    4.4 机器启动时从某一台机器同步数据

    5.CP架构

    5.1 CAP与BASE

    C代表Consistency,一致性,是指所有节点在同一时刻的数据是相同的,即更新操作执行结束并响应用户完成后,所有节点存储的数据应该是一样的(强调的是各个节点间的数据一致)。

    A代表Availability,可用性,是指系统提供的服务一直处于可用状态,对于用户的请求可即时响应。任何客户端的请求,不管访问哪个节点,都能得到响应数据,但不保证是同一份最新数据,即不保证每个节点给你的数据都是最新的(强调服务可用,但不保证数据的一致)。

    P代表Partition Tolerance,分区容错性,是指在分布式系统遇到网络分区的情况下,仍然可以响应用户的请求。网络分区是指因为网络故障导致网络不连通,不同节点分布在不同的子网中(强调不管内部出现什么样的数据同步问题,会一直提供服务)。

    只要有网络交互就一定会有延迟和数据丢失,而这种状况必须接受,还必须保证系统不能挂掉。所以P是必须要保证的。

    • 当选择一致性C时,如果因为网络分区发生了消息丢失、延迟过高,部分节点无法保证特定信息最新的,这时系统将返回写失败错误,也就是集群拒绝新数据写入。
    • 当选择可用性A时,系统始终可用(也即处理客户端的请求),如果发生了分区,一些节点将无法返回最新的特定信息。

    BASE可以看出AP的延伸,是对互联网大规模分布式系统的实践总结,强调可用性。

    • 基本可用Basically Available,当分布式系统在出现不可预知的故障时,允许损失部分功能的可用性,保障核心功能的可用性。四板斧:流量削峰(错峰处理)、延迟响应(排队等待处理)、体验降级(小图片代替原始图片)、过载保护。
    • 最终一致性Eventually consistent,系统中所有的数据副本在经过一段时间的同步后,最终能够达到一个一致的状态。也即在数据的一致性上,存在一个短暂的延迟。
    • 软状态Soft State:不同节点间,数据副本存在短暂的不一致,是一种过渡状态。

    最终一致性,以什么数据为准?

    • 以最新写入的数据为准
    • 以第一次写入的数据为准

    最终一致性具体的实现方式?

    • 读时修复:在读取数据时,检测到数据不一致,进行修复。
    • 写时修复:在写入数据时,检测到数据不一致,进行修复。比如不同节点写失败,就将数据缓存下来,然后定时重传,修复数据的不一致性。
    • 异步修复:最常用的方式,通过定时对账检测副本数据的一致性,并修复。

    在实现最终一致性的时候,推荐同时实现自定义写一致性级别(All、Quorum、One、Any),让用户可以自主选择相应的一致性级别。

    5.2 Nacos的CP架构

    raft协议:

    • Leader选举(ZAB所有节点都需要发起投票,然后PK,raft先发投票的一般不出意外都会成为leader)
      1)选举超时时间(150-300ms)
      2)超时则变成候选人,发起投票,先投给自己,然后将请求发给其他人
      3)其他人收到请求,如果还未超时,则投票给该人并且重置自己的超时时间
      4)候选人收到大部分选票,则成为Leader
      5)Leader同步Append Entries信息给followers
    • 日志复制(数据同步):两阶段提交

    DelegateConsistencyServiceImpl#put
    RaftConsistencyServiceImpl#put
    注册信息写文件和内存
    配置信息写数据库
    这里raft协议并没有两阶段提交,写完日志直接提交了。会发生问题,比如Leader成功提交了但是其他节点抛异常失败了。

    心跳同步的数据
    从节点:0表示本地有,1表示主节点有

    6.Nacos2.X源码剖析

    6.1 注册

    6.1.1 客户端

    跟1.4源码一样,NacosServiceRegistryAutoConfiguration注册了三个Bean:

    • NacosServiceRegistry,register()方法,deregister()方法,根据Registration获取实例Instance的方法
    • NacosRegistration,加了@PostConstruct注解的init()设置port、心跳间隔、心跳超时(BeanPostProcessor#postProcessBeforeInitialization()初始化前会调用@PostConstruct注解的init()方法进行设置)。可以设置和获取各种属性:serviceId、host、port、uri等等
    • NacosAutoServiceRegistration,包含上面两个Bean,是个ApplicationListener
      NacosAutoServiceRegistration是核心,其余两个Bean是为了拆分职能,为该类服务。

    实例注册的地方:

    • WebServerStartStopLifecycle#start发布ServletWebServerInitializedEvent事件
    • NacosAutoServiceRegistration监听WebServerInitializedEvent事件
    • AbstractAutoServiceRegistration#onApplicationEvent
    • bind(event);
    • start()
       发布InstancePreRegisteredEvent事件;
       register();
       发布InstanceRegisteredEvent事件;
       设置running为true。
    • 重点看register
      NacosAutoServiceRegistration#register
      AbstractAutoServiceRegistration#register
    • this.serviceRegistry.register(getRegistration());这里的serviceRegistry就是NacosServiceRegistry,getRegistration()返回的就是NacosRegistration。
    • 最终来到NacosServiceRegistry#register(Registration)
       获取serviceId、group,以及将registration转换成Instance;
       namingService.registerInstance(serviceId, group, instance);注册实例;
    • NacosNamingService#registerInstance()

    现在来重点看一下NacosNamingService#registerInstance:

    • clientProxy.registerService(serviceName, groupName, instance); 这里clientProxy是NamingClientProxyDelegate
    • getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);如果实例是临时的,则使用grpcClientProxy,否则使用httpClientProxy
    • NamingGrpcClientProxy#registerService
      1)redoService.cacheInstanceForRedo(serviceName, groupName, instance);
      2)NamingGrpcClientProxy#doRegisterService

    重点来看一下NamingGrpcClientProxy#doRegisterService

    • 创建InstanceRequest,REGISTER_INSTANCE
    • NamingGrpcClientProxy#requestToServer调用RpcClient#request()发起请求
    • NamingGrpcRedoService#instanceRegistered,将registeredInstances中对应的服务设置为已注册

    6.1.2 NamingGrpcRedoService

    实例注册流程:

    • 注册前,redoService.cacheInstanceForRedo(),先将实例放入到registeredInstances
    • 注册成功后,redoService.instanceRegistered(),将对应实例设置为已注册:redoData.setRegistered(true);

    NamingGrpcRedoService中有一个调度线程池,只有一个单独的线程,每隔3s运行一次RedoScheduledTask。

    RedoScheduledTask#run

    • 会根据实例的状态筛选出需要操作的实例进行处理。
        public void run() {
            if (!redoService.isConnected()) {
                LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
                return;
            }
            try {
                redoForInstances();
                redoForSubscribes();
            } catch (Exception e) {
                LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
            }
        }
    

    6.1.3 服务端

    服务端根据请求InstanceRequest找请求处理器InstanceRequestHandler

    • InstanceRequestHandler#handle
    • 根据请求类型REGISTER_INSTANCE,InstanceRequestHandler#registerInstance
    • EphemeralClientOperationServiceImpl#registerInstance
       1)getSingleton()获取单例Service;
       2)Client client = clientManager.getClient(clientId);获取客户端在服务端对应的对象信息;
       3)instanceInfo = getPublishInfo(instance);客户端Instance转换为服务端的Instance;
       4)client.addServiceInstance(singleton, instanceInfo);
       4-1)publishers.put(service, instancePublishInfo)将注册实例放到客户端对象的publishers这个map中
       4-2)发布客户端变化事件ClientEvent.ClientChangedEvent,向Nacos其他服务器同步数据
       5) client.setLastUpdatedTime();更新客户端lastUpdatedTime
       6)发布客户端注册事件ClientOperationEvent.ClientRegisterServiceEvent
       7)发布实例元数据变化事件MetadataEvent.InstanceMetadataEvent

    InstanceRequestHandler#handle
    registerInstance() 客户端注册信息都放在了对应的Client里面
    NotifyCenter

    • 事件1 ClientChangedEvent 集群同步
    • 事件2 ClientRegisterServiceEvent -> 发布事件ServiceChangedEvent -> NamingSubscriberServiceV2Impl#onEvent
    • 事件3 InstanceMetadataEvent

    服务要么全是临时实例,要么是持久实例,不会一部分是临时一部分是持久。这个跟1.X不同。
    注册表结构也改变了,1.X是一个大Map,2.X分成了多个数据结构。
    gRPC底层类似于Netty,服务端会对每个客户端生成一个Client,也即类似于Netty的SocketChannel。

    Nacos1.X一个大Map会产生很多冲突,使用写时复制。
    Nacos2.X拆分为了很多Map,并发粒度很小。大大分散了冲突的压力。

    NamingSubscriberServiceV2Impl#onEvent:
    服务注册会推送到所有订阅该服务的客户端;
    服务订阅只会推送到新增的订阅的客户端。

    客户端在服务端有两个对象:

    • Client
    • Connection

    6.1.4 事件处理机制

    客户端变化事件ClientEvent.ClientChangedEvent处理流程
    NotifyCenter#publishEvent()
    -> NotifyCenter#publishEvent()
    -> EventPublisher publisher = INSTANCE.publisherMap.get(topic);然后publisher.publish(event);
    -> DefaultPublisher#publish放到阻塞队列queue里面
    ---异步---
    -> DefaultPublisher#run
    -> openEventHandler();
    -> 从阻塞队列queue中获取Event,并进行处理receiveEvent(event);
    -> DefaultPublisher#notifySubscriber
    -> DistroClientDataProcessor#onEvent
    -> DistroClientDataProcessor#syncToAllServer
    -> distroProtocol.sync(distroKey, DataOperation.CHANGE)
    -> DistroProtocol#syncToTarget向不包括自己的其他所有Nacos Server同步数据
    -> distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);核心就是tasks.put(key, newTask); 这里任务是DistroDelayTask
    ----异步处理----
    -> NacosDelayTaskExecuteEngine.ProcessRunnable#run
    -> processTasks()从tasks中获取任务并进行处理
    -> DistroDelayTaskProcessor#process
    -> distroTaskEngineHolder.getExecuteWorkersManager(). addTask(distroKey, syncChangeTask);
    -> TaskExecuteWorker#process放到阻塞队列中queue.put(task);
    ----异步处理----
    -> TaskExecuteWorker.InnerWorker#run从queue.take()获取任务并执行task.run(),任务类型是DistroSyncChangeTask
    -> DistroSyncChangeTask#run
    -> DistroSyncChangeTask#doExecuteWithCallback
    -> getDistroComponentHolder().findTransportAgent(type)
    .syncData(distroData, getDistroKey().getTargetServer(), callback);
    -> DistroClientTransportAgent#syncData()
    -> clusterRpcClientProxy.asyncRequest(),请求类型是DistroDataRequest

    服务端处理:
    -> DistroDataRequestHandler#handle
    -> DistroDataRequestHandler#handleSyncData
    -> DistroProtocol#onReceive
     1)dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
     2)dataProcessor.processData(distroData);
    -> DistroClientDataProcessor#processData
    -> DistroClientDataProcessor#handlerClientSyncData
     1)clientManager.syncClientConnected();同步客户端连接
     2)upgradeClient(client, clientSyncData);更新内存注册表数据,发送服务注册事件ClientOperationEvent.ClientRegisterServiceEvent

    发布客户端注册事件ClientOperationEvent.ClientRegisterServiceEvent
    -> 前面都一样NotifyCenter -> DefaultPublisher -> ClientServiceIndexesManager
    -> ClientServiceIndexesManager#onEvent
    -> ClientServiceIndexesManager#handleClientOperation
    -> ClientServiceIndexesManager#addPublisherIndexes
     1)publisherIndexes.get(service).add(clientId);新增服务对应的客户端id
     2)发布事件ServiceEvent.ServiceChangedEvent
    -> 事件ServiceEvent.ServiceChangedEvent处理过程
    -> 前面都一样NotifyCenter -> DefaultPublisher -> NamingSubscriberServiceV2Impl
    -> NamingSubscriberServiceV2Impl#onEvent
    -> delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
    ---异步处理:推送到当前服务的所有订阅者----
    -> NacosDelayTaskExecuteEngine队列tasks -> PushDelayTaskExecuteEngine.PushDelayTaskProcessor队列queue -> PushExecuteTask#run
    -> PushExecuteTask#run
    -> delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
    new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
    -> PushExecutorRpcImpl#doPushWithCallback
    -> RpcPushService#pushWithCallback
    -> connection.asyncRequest,请求是NotifySubscriberRequest
    ---客户端处理---
    -> NamingPushRequestHandler#requestReply
    -> ServiceInfoHolder#processServiceInfo 更新客户端服务实例本地缓存

    6.2 服务发现

    Ribbon
    -> ZoneAwareLoadBalancer构造方法
    -> 父类DynamicServerListLoadBalancer()构造方法
    -> DynamicServerListLoadBalancer#restOfInit
     enableAndInitLearnNewServersFeature();
     updateListOfServers();
    -> NacosServerList#getUpdatedListOfServers
    -> NacosNamingService#selectInstances()
    -> NamingClientProxyDelegate#subscribe 在下面通过grpc请求服务端获取到实例后,会更新本地缓存,然后发布InstancesChangeEvent事件,并且将ServiceInfo写入本地磁盘。
    -> NamingGrpcClientProxy#subscribe
     1)redoService.cacheSubscriberForRedo()
     2)doSubscribe()
    -> NamingGrpcClientProxy#doSubscribe发送SubscribeServiceRequest请求

    服务端
    SubscribeServiceRequestHandler
    1)查询服务相关的实例
    Map<Service, Set<clientId>> 拿到clientId,然后根据ClientId拿到Client然后再拿到其服务对应的实例。
    2)订阅 并发布事件 ClientSubscribeServiceEvent -> 处理完发布事件ServiceSubscribedEvent

    6.3 客户端定时拉取任务UpdateTask

    -> NamingClientProxyDelegate创建ServiceInfoUpdateService
    -> NamingClientProxyDelegate#subscribe
    -> ServiceInfoUpdateService.scheduleUpdateIfAbsent
    -> UpdateTask定时(每秒)拉取注册中心注册的服务namingClientProxy.queryInstancesOfService

    6.4 心跳-健康检查

    ConnectionManager#start,该方法有注解@PostConstruct。

    • RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(Runnable, 1000L, 3000L, TimeUnit.MILLISECONDS); 定时调度3s一次(每次结束后延迟3s)。
    • 超过20s没有给服务端发心跳的客户端,服务端会发起请求探活,如果失败或者超过1s未响应则剔除服务。ClientDetectionRequest 探活

    在Nacos 1.x版本中,临时实例需要客户端(服务提供者)定时向Nacos发送心跳包来维持自己的健康状态。持久化实例并不基于客户端发送心跳包,而是服务端定时探测客户端进行健康检查(TCP端口探测、HTTP返回码探测)。

    在Nacos 2.0版本之后持久化实例的监控检查并没有改变逻辑;但临时实例不再使用心跳包,而是通过判断gRPC长连接是否存活来判断临时实例是否健康。

    7.gRPC

    客户端
    grpcClientProxy -> NamingGrpcClientProxy -> start()
    心跳
    3次重试连接服务端connectToServer(),如果失败,同步重连其他Server。
    异步重连。

    服务端
    BaseRpcServer @PostConstruct
    BaseGrpcServer
    type -> RequestHandler type是类名,比如InstanceRequest
    RequestHandlerRegistry-> ApplicationListener
    注册Connection Client

    相关文章

      网友评论

          本文标题:Nacos源码剖析

          本文链接:https://www.haomeiwen.com/subject/lkwgcdtx.html