美文网首页
微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试

微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试

作者: AI码师 | 来源:发表于2021-01-17 19:00 被阅读0次

Nacos 如何扛住高并发读写?

最近经常阅读源码,发现大部分框架在解决并发读写的时候,都会使用COW的思想来解决;
nacos也不例外。

解决方案

假设我们创建一个map来存储并发数据,我们先看下在并发场景下,从这个map中进行读写会出现什么问题:

在这里插入图片描述

针对超大的map进行写操作会很耗时,导致其他线程对这个map的读写操作会等待很久;

那么在nacos中,是如何进行解决的呢?

其实nacos处理的思路很简单,我简要概括下,然后跟踪下源码,带大家看看大佬是如何写代码的:

  1. 首先naocs 将内存中的注册列表map 复制一份当到map1
  2. 然后将客户端同步过来的注册key添加到map1中
  3. 处理完所有的key之后,将map1重新复制给内存中的注册列表map中
在这里插入图片描述

源码跟踪

通过阅读源码,我找到了nacos进行更新注册列表的方法:
com.alibaba.nacos.naming.core.Cluster.updateIPs()

  public void updateIPs(List<Instance> ips, boolean ephemeral) {
// 首先判断是需要更新临时注册列表还是持久化的注册列表(这个会在后面讲解ap/cp提到)
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 创建一个map,来保存内存中的注册列表
        HashMap<String, Instance> oldIPMap = new HashMap<>(toUpdateInstances.size());
// 遍历注册列表,依次添加到副本中
        for (Instance ip : toUpdateInstances) {
            oldIPMap.put(ip.getDatumKey(), ip);
        }

// 省略处理key的过程
        toUpdateInstances = new HashSet<>(ips);
// 将更新后的注册列表 重新复制到内存注册列表中
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

作为注册中心的Eureka是怎么实现高并发读写?

在eureka中,使用多级缓存结构来解决高并发读写的问题。
eureka会创建一个只读注册列表和一个读写注册列表:
如果客户端发起注册或退出的时候,eureka会先把最新的注册列表内容更新到读写注册列表中,同时在eureka启动时会创建一个定时任务,定时把读写的注册列表的内容同步到只读注册列表中,客户端在进行服务发现的时候,是从只读注册列表中获取可用的服务列表。

在这里插入图片描述

Nacos的ap和cp又是怎么回事

在学习分布式相关框架的时候,我们都离不开CAP理论,这里就不过多介绍CAP理论了;
令开发者疑惑的是为什么nacos既能支持ap又能支持cp呢,这在面试过程中经常会被问到。相信大家在看完这篇文章后,应该就可以手撕面试官了。

前言

在nacos中,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上;
nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。
通过阅读源码,我们可以找到这段代码,关于如何找到这段代码,后面会在nacos源码解读的文章中讲解:
com.alibaba.nacos.naming.core.ServiceManager.addInstance()

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        // 生成服务的key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取服务
        Service service = getService(namespaceId, serviceName);
        // 使用同步锁处理
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 调用consistencyService.put 处理同步过来的服务
            consistencyService.put(key, instances);
        }
    }

我们在进入到consistencyService.put方法中

在这里插入图片描述

点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类

    @Override
    public void put(String key, Record value) throws NacosException {
        // 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步
        mapConsistencyService(key).put(key, value);
    }

从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;

   private ConsistencyService mapConsistencyService(String key) {
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

AP 方式同步的流程(ephemeralConsistencyService)

本地服务器处理注册信息&将注册信息同步到其它节点

    @Override
    public void put(String key, Record value) throws NacosException {
        // 处理本地注册列表
        onPut(key, value);
        // 添加阻塞任务,同步信息到其他集群节点
        taskDispatcher.addTask(key);
    }

处理本地注册节点

nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);

这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。

线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。

同步注册信息到其他集群节点

nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:

       @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);
                    // 省略判断代码
                    // 添加同步的key
                    keys.add(key);
                    // 计数
                    dataSize++;
                    // 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                        // 遍历所有集群节点,直接调用http进行同步
                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }

                            dataSyncer.submit(syncTask, 0);
                        }
                        // 记录本次同步时间
                        lastDispatchTime = System.currentTimeMillis();
                        // 计数清零
                        dataSize = 0;
                    }
            }
        }
    }

使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题:
如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;

同步少量的key解决方案:
  1. 只有积累到指定数量的key,才发起批量同步
  2. 距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步

CP 方式同步的流程(RaftConsistencyServiceImpl)

cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)

nacos使用raft协议来进行选举leader,来实现cp模式。

同样进入到 RaftConsistencyServiceImpl的put方法

    @Override
    public void put(String key, Record value) throws NacosException {
        try {
            raftCore.signalPublish(key, value);
        } catch (Exception e) {
            Loggers.RAFT.error("Raft put failed.", e);
            throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
        }
    }

进入到raftCore.signalPublish方法中,我提取几个关键的代码

// 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走
if (!isLeader()) {
            JSONObject params = new JSONObject();
            params.put("key", key);
            params.put("value", value);
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);

            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
            return;
        }

// 同样采用同样队列的方式,去处理本地注册列表

onPublish(datum, peers.local());

public void onPublish(Datum datum, RaftPeer source) throws Exception {
       
        // 添加同步key任务到阻塞队列中
        notifier.addTask(datum.key, ApplyAction.CHANGE);

        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
    }

遍历所有集群节点,发送http同步请求

 for (final String server : peers.allServersIncludeMyself()) {
                // 如果是leader,则不进行同步
                if (isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                // 组装url 发送同步请求到其它集群节点
                final String url = buildURL(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                datum.key, server, response.getStatusCode());
                            return 1;
                        }
                        latch.countDown();
                        return 0;
                    }

                    @Override
                    public STATE onContentWriteCompleted() {
                        return STATE.CONTINUE;
                    }
                });

            }

各个集群节点处理同步请求这里就不过多介绍了,大家可以自行去看哈

微信搜一搜【乐哉开讲】关注帅气的我,回复【干货领取】,将会有大量面试资料和架构师必看书籍等你挑选,包括java基础、java并发、微服务、中间件等更多资料等你来取哦。

相关文章

网友评论

      本文标题:微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试

      本文链接:https://www.haomeiwen.com/subject/uehzaktx.html