注册中心
1.数据结构 Map<namespace, Map<group::serviceName, Service>>
内部数据结构
namespace:区分开发环境、测试环境还是生产环境。
group:那个分组的(交易、账单、会员)
serviceName:服务名称,比如交易组下面有订单、支付、转账等服务
cluster:实例的集群,异地部署
instance:实例
服务注册
- 第一步:springcloud的通用类AbstractAutoServiceRegistration会监听WebServerInitializedEvent,这样,应用初始化后会发送事件,AbstractAutoServiceRegistration调用onApplicationEvent处理事件(bind->start->registerManagement),这样就会调用到NacosServiceRegistry的register方法。
- 第二步:NacosServiceRegistry#getNacosInstanceFromRegistration组装实例信息,NacosNamingService#registerInstance方法开始注册实例,默认ephemeral为true(AP架构,不会持久化),则会构建心跳信息,并启动周期任务,每个五秒发送一次心跳,调用的是/nacos/v1/ns/instance/beat接口,详细的在服务心跳里面解释,ephemeral为false则不会(CP架构)。
- 第三步:接着调用registerService,POST调用/nacos/v1/ns/instance。
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
第四步:服务端/instance接口,调用到InstanceController#register接口,ServiceManager#registerInstance注册实例,createEmptyService方法做了两件事,保留到注册表以及启动定时任务检测心跳。
private void putServiceAndInit(Service service) throws NacosException {
// 将实例放入Map(namespace, Map(group::serviceName, Service))
putService(service);
// 将service封装成一个任务,并且每五秒执行一次,用于心跳检测,服务上次心跳时间>15s,则不存活,>30s则删除
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
第五步:addInstance方法里面的addIpAddresses会把实列信息放入service的clusterMap<cluster,cluster>。
第六步:consistencyService.put(key, instances)在集群情况下默认使用DistroConsistencyServiceImpl,如果为false,则使用RaftConsistencyServiceImpl,用来同步数据。后面在AP与CP中介绍。注意,这里同步节点信息与最新的实例更新到内存实例表用的异步解耦来保证客户端响应性
服务心跳
客户端心跳
1.Distro协议下,客户端才会发送心跳。nocas客户端首次注册的时候会每隔5秒调用/nacos/v1/ns/instance/beat接口,发送心跳。
2.InstanceController#beat接口接收到请求,转换心跳信息(RsInfo),如果收到心跳,但是没有注册信息,则会再次注册这个实例。
3.service#processClientBeat异步处理心跳,这个时候已经告诉客户端我这边接收到心跳了。修改instance上次的心跳时间,如果该实例上次不是健康的,则会变成成功的,并发布ServiceChangeEvent事件。
4.PushService#onApplicationEvent又开启线程,用UDP告诉客户端这个实例又活过来了,然后更新客户端的可轮询实例。
服务端心跳
1.实例注册的时候,service会开启一个五秒一次的定时检查任务,service.init();
2.ClientBeatCheckTask#run方法会将超过15s的标志为不健康,30s的删除。不健康的会发送ServiceChangeEvent(通知消费者该实例不可用)和InstanceHeartbeatTimeoutEvent事件(没看到实现)。
超过30s则会执行deleteIp(instance),调用/v1/ns/instance删除。同样的会调用到consistencyService.put(key, instances)更新内存表以及节点信息同步。
服务健康检查
参见服务心跳
服务发现
1.客户端第一次发起远程调用的时候,ribbon会调用到NacosNamingService#getAllInstances
2.HostReactor#getServiceInfo先从缓存中获取,如果缓存中没有,则会调用/instance/list,并上传UDP端口(订阅)。
3.启动定时任务,间隔1S,定时去更新本地服务表。
注意:心跳检测会利用hash选择一台健康机器上进行。
总结:nacos通过客户端定时拉取以及服务端UDP推送保证服务的可见性。
服务同步
见AP与CP架构
AP与CP,raft与Distro
AP/Distro
####### DistroController
1.(PUT)/datum 同步数据
2./checksum 校验数据 如果当前节点含有另外一个节点发过来的数据没有的数据,则会删除。如果当前节点没有另外一个节点的数据,则会同步另外一个节点的数据。
3.(GET)/datum 获取数据
4./datums 获取所有数据
####### Distro启动数据同步
1.DistroProtocol构造方法启动定时任务startVerifyTask()(间隔5s),检查存储数据,调用/v1/ns/distro/checksum接口校验数据,其实就是把当前启动的节点数据同步给其他人,第一次启动肯定是没有数据的。后续5s执行一次数据同步。
2.启动startLoadTask()任务,调用/distro/datums获取其他节点的所有数据,更新到当前节点。
####### 插入数据同步
1.插入数据是会更具是否持久化选择协议mapConsistencyService(key).put(key, value),默认走的是DistroConsistencyServiceImpl的put方法。
2.本地插入利用队列解耦,DistroConsistencyServiceImpl内部类执行Notifier的run方法,一直执行,从队列中取数,执行onChange-->updateIPs把数据写道注册表,这里为了提供并发,利用了写时复制。还会利用UDP通知客户端。
3.另外执行distroProtocol#sync方法通知其他节点数据变动,这里有用一个map封装任务,NacosDelayTaskExecuteEngine在构造方法里面启动一个100毫秒周期的定时任务,执行ProcessRunnable,里面就会从map中取数据,最终会调用/datum接口向其他节点推送数据,其他节点又会调用到第2步。
CP/RAFT
####### 核心接口
/raft/vote 接受选票并pk选票,返回pk后的选票
/raft/beat 心跳
/raft/datum(DEL) 删除数据
/raft/datum(GET) 获取数据
/raft/datum/commit 提交数据,两阶段提交
/raft/peer 选票pk的时候回去拉去其他节点的peer状态。
####### Raft启动选举与数据同步
######## 选举
1.RaftCore利用spring注解PostConstruct执行到选举任务MasterElection#run,没500ms进行一次,peer的时间都是一个随机数。
2.调用/raft/vote进行选择,如果周期小于另外一个节点,则会返回选择另外一个节点,否则,另外一个节点就要重置时间就要变成FOLLOWER
3.收到选票,统计选票,半数以上则把自己变成leader
######## 心跳
1.RaftCore利用spring注解PostConstruct执行到心跳任务HeartBeat#run,必须是leader才会发送心跳给从节点。心跳包中包含datum.key值。
2.调用从节点的raft/beat接口,同步从节点的节点信息(选举周期,主节点ip等)。另外比对从节点的key和主节点的key是否有变化,如果变化了,则会调用/raft/datum(GET)获取主节点的数据,达到同步的效果。
####### 插入数据同步
1.插入数据是会更具是否持久化选择协议mapConsistencyService(key).put(key, value),选择到了RaftConsistencyServiceImpl#put方法
2.调用signalPublish,如果不是leader,则路由到leader的/raft/datum接口,如果是主机,则先提交数据到内存和磁盘,然后分别调用其他节点的/raft/datum/commit,把数据同步到其他节点。这里有点问题,如果半数以上的节点没有成功,没有看到回滚。
3.RaftController#/raft/datum同样会调用到signalPublish方法
配置中心
1.数据结构
Nacos 数据模型 Key 由三元组唯一确定, Namespace默认是空串,公共命名空间(public),分组默认是
DEFAULT_GROUP
2.启动数据落盘
- 第一步:spring启动会加载DumpService的@PostConstruct的init方法,集群模式使用ExternalDumpService
- 第二步:执行clearConfigHistory任务,保留30天的数据
- 第三步:全量dump配置信息,根据心跳时间,心跳时间在六小时内走增量更新,否则走全量。增量查找最近六小时的,比对更新,全量就是查出来全部更新磁盘
- 第四步:如果发现数据改动,则发送时间,异步通知监听器,变更数据。
3.数据发布
- 第一步:使用post调用/v1/cs/configs,这样就会到ConfigController#publishConfig的方法
- 第二步:调用persistService.insertOrUpdate方法将数据保存到数据库(单机就会落本地盘)
- 第三步:调用ConfigChangePublisher#notifyConfigChange发送ConfigDataChangeEvent改变事件,选择一个合适的事件发布器
四个发布器.png
- 第四步:使用ArrayBlockingQueue异步解耦,接收事件,如果队列满了,则改为同步通知。
- 第五步:调用DeafaultPublisher#receiveEvent接收事件,订阅的是AsyncNotifyService
- 第六步:AsyncNotifyService#onEvent,为每个NotifySingleTask,放入queue中
- 第七步:AsyncTask#executeAsyncInvoke异步执行,调用/nacos/v1/cs/communication/dataChange通知,并且AsyncNotifyCallBack#onReceive回调处理结果(这里有个问题,本机也会远程调用本机)
- 第八步:CommunicationController#notifyConfigInfo执行dumpService.dump,将数据更新到本机磁盘,这一步同样是异步的,会把通知封装放入ConcurrentHashMap。
- 第九步:ProcessRunnable异步线程调用processTasks方法处理map中的任务,使用DumpProcessor处理任务
- 第十步:通过传过来的dataId/group/tenant查找数据库,获取到配置,然后比对本地磁盘上的配置(md5比对),如果对不上,则写入到磁盘,并且发布LocalDataChangeEvent事件,该事件会通知监听了该文件的客户端,文件发生变化了,见配置监听部分。
4.数据获取
- 第一步:使用get调用/v1/cs/configs,这样就会到ConfigController#getConfig的方法
- 第二步:调用doGetConfig,默认访问磁盘而不去调用数据库。然后返回
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, autoTag);
} else {
file = DiskUtil.targetTagFile(dataId, group, tenant, autoTag);
}
5.数据监听
- 第一步:文件发生变动,会触发LocalDataChangeEvent事件,这个时间用LongPollingService处理,调用到DataChangeTask的run方法。
- 第二步:遍历所有的订阅者,如果匹配上了则发送通知,告知文件变动了。
- 还有另外一种方式,就是有一个29.5s的定时任务,如果配置没有更改,则告诉客户端没有变动(客户端超时30s)
springboot整合nacos配置中心
配置拉取
- 第一步:springboot启动是会调用prepareContext方法,这个方法的applyInitializers会调用nacos,而在调用这个方法之前调用prepareEnvironment已经加载了我们的配置文件,所以,nacos的配置会覆盖掉配置文件的配置。
- 第二步:调用到PropertySourceBootstrapConfiguration的initialize方法,里面会调用locator.locateCollection方法,nocas实现了locator-NacosPropertySourceLocator,就调用到nocas的locate方法
- 第三步:loadSharedConfiguration --> loadExtConfiguration --> loadApplicationConfiguration,所以应用本身的配置大于Ext配置大于Shared配置,其中当前应用的配置先执行微服务的名称文件,再执行名称.扩展名的文件,再执行名称-profile.扩展名的文件,层层覆盖合并。
配置监听
1.ClientWorker客户端工作类启动线程,使用长轮循获取服务端变化的配置,超时时间是30s,服务端是29.5s
2.NacosContextRefresher实现ApplicationListener,处理ApplicationReadyEvent时间,当容器启动完毕的时候,执行onApplicationEvent方法,调用configService.addListener给该配置添加监听器
3.第一步收到配置修改了,回调监听器的方法innerReceive,里面会发布一个RefreshEvent事件
4.RefreshEventListener收到该事件,更新所有新的配置,并销毁RefreshScope的bean,下次加载到这个bean的时候就会使用最新的配置了。
网友评论