美文网首页
微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试

微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试

作者: AI码师 | 来源:发表于2021-01-17 19:00 被阅读0次

    Nacos 如何扛住高并发读写?

    最近经常阅读源码,发现大部分框架在解决并发读写的时候,都会使用COW的思想来解决;
    nacos也不例外。

    解决方案

    假设我们创建一个map来存储并发数据,我们先看下在并发场景下,从这个map中进行读写会出现什么问题:

    在这里插入图片描述

    针对超大的map进行写操作会很耗时,导致其他线程对这个map的读写操作会等待很久;

    那么在nacos中,是如何进行解决的呢?

    其实nacos处理的思路很简单,我简要概括下,然后跟踪下源码,带大家看看大佬是如何写代码的:

    1. 首先naocs 将内存中的注册列表map 复制一份当到map1
    2. 然后将客户端同步过来的注册key添加到map1中
    3. 处理完所有的key之后,将map1重新复制给内存中的注册列表map中
    在这里插入图片描述

    源码跟踪

    通过阅读源码,我找到了nacos进行更新注册列表的方法:
    com.alibaba.nacos.naming.core.Cluster.updateIPs()

      public void updateIPs(List<Instance> ips, boolean ephemeral) {
    // 首先判断是需要更新临时注册列表还是持久化的注册列表(这个会在后面讲解ap/cp提到)
            Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    // 创建一个map,来保存内存中的注册列表
            HashMap<String, Instance> oldIPMap = new HashMap<>(toUpdateInstances.size());
    // 遍历注册列表,依次添加到副本中
            for (Instance ip : toUpdateInstances) {
                oldIPMap.put(ip.getDatumKey(), ip);
            }
    
    // 省略处理key的过程
            toUpdateInstances = new HashSet<>(ips);
    // 将更新后的注册列表 重新复制到内存注册列表中
            if (ephemeral) {
                ephemeralInstances = toUpdateInstances;
            } else {
                persistentInstances = toUpdateInstances;
            }
        }
    

    作为注册中心的Eureka是怎么实现高并发读写?

    在eureka中,使用多级缓存结构来解决高并发读写的问题。
    eureka会创建一个只读注册列表和一个读写注册列表:
    如果客户端发起注册或退出的时候,eureka会先把最新的注册列表内容更新到读写注册列表中,同时在eureka启动时会创建一个定时任务,定时把读写的注册列表的内容同步到只读注册列表中,客户端在进行服务发现的时候,是从只读注册列表中获取可用的服务列表。

    在这里插入图片描述

    Nacos的ap和cp又是怎么回事

    在学习分布式相关框架的时候,我们都离不开CAP理论,这里就不过多介绍CAP理论了;
    令开发者疑惑的是为什么nacos既能支持ap又能支持cp呢,这在面试过程中经常会被问到。相信大家在看完这篇文章后,应该就可以手撕面试官了。

    前言

    在nacos中,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上;
    nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。
    通过阅读源码,我们可以找到这段代码,关于如何找到这段代码,后面会在nacos源码解读的文章中讲解:
    com.alibaba.nacos.naming.core.ServiceManager.addInstance()

        public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
            // 生成服务的key
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
            // 获取服务
            Service service = getService(namespaceId, serviceName);
            // 使用同步锁处理
            synchronized (service) {
                List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    
                Instances instances = new Instances();
                instances.setInstanceList(instanceList);
                // 调用consistencyService.put 处理同步过来的服务
                consistencyService.put(key, instances);
            }
        }
    

    我们在进入到consistencyService.put方法中

    在这里插入图片描述

    点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类

        @Override
        public void put(String key, Record value) throws NacosException {
            // 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步
            mapConsistencyService(key).put(key, value);
        }
    

    从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;

       private ConsistencyService mapConsistencyService(String key) {
            return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
        }
    

    AP 方式同步的流程(ephemeralConsistencyService)

    本地服务器处理注册信息&将注册信息同步到其它节点

        @Override
        public void put(String key, Record value) throws NacosException {
            // 处理本地注册列表
            onPut(key, value);
            // 添加阻塞任务,同步信息到其他集群节点
            taskDispatcher.addTask(key);
        }
    

    处理本地注册节点

    nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);

    这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。

    线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。

    同步注册信息到其他集群节点

    nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:

           @Override
            public void run() {
    
                List<String> keys = new ArrayList<>();
                while (true) {
                        String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                            TimeUnit.MILLISECONDS);
                        // 省略判断代码
                        // 添加同步的key
                        keys.add(key);
                        // 计数
                        dataSize++;
                        // 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步
                        if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                            (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                            // 遍历所有集群节点,直接调用http进行同步
                            for (Server member : dataSyncer.getServers()) {
                                if (NetUtils.localServer().equals(member.getKey())) {
                                    continue;
                                }
                                SyncTask syncTask = new SyncTask();
                                syncTask.setKeys(keys);
                                syncTask.setTargetServer(member.getKey());
    
                                if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                    Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                                }
    
                                dataSyncer.submit(syncTask, 0);
                            }
                            // 记录本次同步时间
                            lastDispatchTime = System.currentTimeMillis();
                            // 计数清零
                            dataSize = 0;
                        }
                }
            }
        }
    

    使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题:
    如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;

    同步少量的key解决方案:
    1. 只有积累到指定数量的key,才发起批量同步
    2. 距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步

    CP 方式同步的流程(RaftConsistencyServiceImpl)

    cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)

    nacos使用raft协议来进行选举leader,来实现cp模式。

    同样进入到 RaftConsistencyServiceImpl的put方法

        @Override
        public void put(String key, Record value) throws NacosException {
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
            }
        }
    

    进入到raftCore.signalPublish方法中,我提取几个关键的代码

    // 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走
    if (!isLeader()) {
                JSONObject params = new JSONObject();
                params.put("key", key);
                params.put("value", value);
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
    
                raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
                return;
            }
    

    // 同样采用同样队列的方式,去处理本地注册列表

    onPublish(datum, peers.local());
    
    public void onPublish(Datum datum, RaftPeer source) throws Exception {
           
            // 添加同步key任务到阻塞队列中
            notifier.addTask(datum.key, ApplyAction.CHANGE);
    
            Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
        }
    

    遍历所有集群节点,发送http同步请求

     for (final String server : peers.allServersIncludeMyself()) {
                    // 如果是leader,则不进行同步
                    if (isLeader(server)) {
                        latch.countDown();
                        continue;
                    }
                    // 组装url 发送同步请求到其它集群节点
                    final String url = buildURL(server, API_ON_PUB);
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                    datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            latch.countDown();
                            return 0;
                        }
    
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
    
                }
    

    各个集群节点处理同步请求这里就不过多介绍了,大家可以自行去看哈

    微信搜一搜【乐哉开讲】关注帅气的我,回复【干货领取】,将会有大量面试资料和架构师必看书籍等你挑选,包括java基础、java并发、微服务、中间件等更多资料等你来取哦。

    相关文章

      网友评论

          本文标题:微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试

          本文链接:https://www.haomeiwen.com/subject/uehzaktx.html