Nacos 如何扛住高并发读写?
最近经常阅读源码,发现大部分框架在解决并发读写的时候,都会使用COW的思想来解决;
nacos也不例外。
解决方案
假设我们创建一个map来存储并发数据,我们先看下在并发场景下,从这个map中进行读写会出现什么问题:
在这里插入图片描述针对超大的map进行写操作会很耗时,导致其他线程对这个map的读写操作会等待很久;
那么在nacos中,是如何进行解决的呢?
其实nacos处理的思路很简单,我简要概括下,然后跟踪下源码,带大家看看大佬是如何写代码的:
- 首先naocs 将内存中的注册列表map 复制一份当到map1
- 然后将客户端同步过来的注册key添加到map1中
- 处理完所有的key之后,将map1重新复制给内存中的注册列表map中
源码跟踪
通过阅读源码,我找到了nacos进行更新注册列表的方法:
com.alibaba.nacos.naming.core.Cluster.updateIPs()
public void updateIPs(List<Instance> ips, boolean ephemeral) {
// 首先判断是需要更新临时注册列表还是持久化的注册列表(这个会在后面讲解ap/cp提到)
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 创建一个map,来保存内存中的注册列表
HashMap<String, Instance> oldIPMap = new HashMap<>(toUpdateInstances.size());
// 遍历注册列表,依次添加到副本中
for (Instance ip : toUpdateInstances) {
oldIPMap.put(ip.getDatumKey(), ip);
}
// 省略处理key的过程
toUpdateInstances = new HashSet<>(ips);
// 将更新后的注册列表 重新复制到内存注册列表中
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
作为注册中心的Eureka是怎么实现高并发读写?
在eureka中,使用多级缓存结构来解决高并发读写的问题。
eureka会创建一个只读注册列表和一个读写注册列表:
如果客户端发起注册或退出的时候,eureka会先把最新的注册列表内容更新到读写注册列表中,同时在eureka启动时会创建一个定时任务,定时把读写的注册列表的内容同步到只读注册列表中,客户端在进行服务发现的时候,是从只读注册列表中获取可用的服务列表。
Nacos的ap和cp又是怎么回事
在学习分布式相关框架的时候,我们都离不开CAP理论,这里就不过多介绍CAP理论了;
令开发者疑惑的是为什么nacos既能支持ap又能支持cp呢,这在面试过程中经常会被问到。相信大家在看完这篇文章后,应该就可以手撕面试官了。
前言
在nacos中,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上;
nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。
通过阅读源码,我们可以找到这段代码,关于如何找到这段代码,后面会在nacos源码解读的文章中讲解:
com.alibaba.nacos.naming.core.ServiceManager.addInstance()
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
// 生成服务的key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取服务
Service service = getService(namespaceId, serviceName);
// 使用同步锁处理
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 调用consistencyService.put 处理同步过来的服务
consistencyService.put(key, instances);
}
}
我们在进入到consistencyService.put方法中
在这里插入图片描述点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类
@Override
public void put(String key, Record value) throws NacosException {
// 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步
mapConsistencyService(key).put(key, value);
}
从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
AP 方式同步的流程(ephemeralConsistencyService)
本地服务器处理注册信息&将注册信息同步到其它节点
@Override
public void put(String key, Record value) throws NacosException {
// 处理本地注册列表
onPut(key, value);
// 添加阻塞任务,同步信息到其他集群节点
taskDispatcher.addTask(key);
}
处理本地注册节点
nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);
这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。
线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。
同步注册信息到其他集群节点
nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
// 省略判断代码
// 添加同步的key
keys.add(key);
// 计数
dataSize++;
// 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
// 遍历所有集群节点,直接调用http进行同步
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask, 0);
}
// 记录本次同步时间
lastDispatchTime = System.currentTimeMillis();
// 计数清零
dataSize = 0;
}
}
}
}
使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题:
如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;
同步少量的key解决方案:
- 只有积累到指定数量的key,才发起批量同步
- 距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步
CP 方式同步的流程(RaftConsistencyServiceImpl)
cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)
nacos使用raft协议来进行选举leader,来实现cp模式。
同样进入到 RaftConsistencyServiceImpl的put方法
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
}
}
进入到raftCore.signalPublish方法中,我提取几个关键的代码
// 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走
if (!isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
// 同样采用同样队列的方式,去处理本地注册列表
onPublish(datum, peers.local());
public void onPublish(Datum datum, RaftPeer source) throws Exception {
// 添加同步key任务到阻塞队列中
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
遍历所有集群节点,发送http同步请求
for (final String server : peers.allServersIncludeMyself()) {
// 如果是leader,则不进行同步
if (isLeader(server)) {
latch.countDown();
continue;
}
// 组装url 发送同步请求到其它集群节点
final String url = buildURL(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
各个集群节点处理同步请求这里就不过多介绍了,大家可以自行去看哈
微信搜一搜【乐哉开讲】关注帅气的我,回复【干货领取】,将会有大量面试资料和架构师必看书籍等你挑选,包括java基础、java并发、微服务、中间件等更多资料等你来取哦。
网友评论