美文网首页
Nacos源码解析

Nacos源码解析

作者: 知止9528 | 来源:发表于2021-02-28 11:18 被阅读0次

    完整流程

    image.png

    Nacos服务注册表结构:Map<namespace, Map<group::serviceName, Service>>


    举例说明:


    image.png

    一:服务注册

    客户端阅读入口

    <!--nacos客户端-->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            </dependency>
    

    1. 查看spring.factories文件中帮我们自动装配的类

    image.png

    2.查看自动装配的类

    com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
      ->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
      //返回 new NacosAutoServiceRegistration()
    

    3.NacosAutoServiceRegistration里的调用链路

    com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
      ->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
      //返回 new NacosAutoServiceRegistration()
         //监听spring的WebServerInitializedEvent启动事件时
       ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#bind
         ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#start
           ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
             ->com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
               ->com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(String serviceName, Instance instance)
               //做了两件事
               //1.调用com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo(String serviceName, BeatInfo beatInfo)添加心跳信息
               //2.调用代理执行注册com.alibaba.nacos.client.naming.net.NamingProxy#registerService(String serviceName, String groupName, Instance instance)
                 ->com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI()
      
    

    小结:其实就是使用http请求了服务端的注册接口

    --

    服务端阅读

    com.alibaba.nacos.naming.controllers.InstanceController#register
     ->com.alibaba.nacos.naming.core.ServiceManager#registerInstance
      ->com.alibaba.nacos.naming.core.ServiceManager#addInstance
       ->com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
        ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
         ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
          ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore#put
          ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask
    

    添加到队列里

    public class Notifier implements Runnable {
            
            private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
            
            private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
            
            /**
             * Add new notify task to queue.
             *
             * @param datumKey data key
             * @param action   action for data
             */
            public void addTask(String datumKey, DataOperation action) {
                
                if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                    return;
                }
                if (action == DataOperation.CHANGE) {
                    services.put(datumKey, StringUtils.EMPTY);
                }
                tasks.offer(Pair.with(datumKey, action));
            }
            
            public int getTaskSize() {
                return tasks.size();
            }
            
            @Override
            public void run() {
                Loggers.DISTRO.info("distro notifier started");
                
                for (; ; ) {
                    try {
                        Pair<String, DataOperation> pair = tasks.take();
                        handle(pair);
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                    }
                }
            }
            
            private void handle(Pair<String, DataOperation> pair) {
                try {
                    String datumKey = pair.getValue0();
                    DataOperation action = pair.getValue1();
                    
                    services.remove(datumKey);
                    
                    int count = 0;
                    
                    if (!listeners.containsKey(datumKey)) {
                        return;
                    }
                    
                    for (RecordListener listener : listeners.get(datumKey)) {
                        
                        count++;
                        
                        try {
                            if (action == DataOperation.CHANGE) {
                                listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }
                            
                            if (action == DataOperation.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                        }
                    }
                    
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO
                                .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                        datumKey, count, action.name());
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
    

    调用链路

    com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#run
     ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#handle
      ->com.alibaba.nacos.naming.core.Service#onChange
       ->com.alibaba.nacos.naming.core.Service#updateIPs
        ->com.alibaba.nacos.naming.core.Cluster#updateIps
    
    public void updateIps(List<Instance> ips, boolean ephemeral) {
            
            ....
            
            toUpdateInstances = new HashSet<>(ips);
            
            if (ephemeral) {
                ephemeralInstances = toUpdateInstances;
            } else {
                persistentInstances = toUpdateInstances;
            }
        }
    

    为Cluster类的成员变量

    private Set<Instance> ephemeralInstances = new HashSet<>();
    

    run()方法什么时候触发?
    可以看到,使用了@PostConstruct注解将notifier提交到了一个线程池里面

    public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
        
        ....
        
        private volatile Notifier notifier = new Notifier();
        
        @PostConstruct
        public void init() {
            GlobalExecutor.submitDistroNotifyTask(notifier);
        }
        
        ....
    }
    

    小结:使用了阻塞队列来提高了并发能力,但是否队列会被撑爆?注册成功延时会有多少?

    我们可以看到队列大小为1024*1024
    同时并发注册的情况应该很小,此外为写内存操作,所以从队列中获取内容进行消费应该也是很快的

    private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
    

    那还为注册完,消费端直接从注册中心获取是否会读到脏数据?

    我们常规的操作可能是直接加锁,写完才运行读,但无疑会影响吞吐量

    我们来看nacos是怎么处理的?
    回到

    public void updateIps(List<Instance> ips, boolean ephemeral) {
            //如果为ephemeral 则复制出一份副本
            Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
            
            HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
            
            //复制操作
            for (Instance ip : toUpdateInstances) {
                oldIpMap.put(ip.getDatumKey(), ip);
            }
            
            //基于oldIpMap 即我们复制出来的 进行注册操作
            List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
            
            ....
    
            //最终将 toUpdateInstances 赋值给ephemeralInstances 或者 persistentInstances
            toUpdateInstances = new HashSet<>(ips);
            
            if (ephemeral) {
                ephemeralInstances = toUpdateInstances;
            } else {
                persistentInstances = toUpdateInstances;
            }
        }
    

    使用到了写时复制,即读写分离的思想
    **那会不会出现多个实例同时写,然后出现覆盖的问题?

    我们回顾之前的初始化逻辑,只会初始化一次,所以这里是一个单线程不断从队列里面拿然后执行注册逻辑,所以不会出现覆盖写的问题

    @PostConstruct
        public void init() {
            GlobalExecutor.submitDistroNotifyTask(notifier);
        }
    

    写时复制会不会占用很多内存空间?

    我们可以看到其实只是复制了Cluster里面的Set<Instance>集合(详细的可能看下面的nacos的完整存储模型),而不是复制了整个注册表,所以我们使用写时复制时需要考虑复制的粒度问题


    我们再来看下nacos底层的存储模型是什么样的?

    /**
     * Core manager storing all services in Nacos.
     *
     * @author nkorange
     */
    @Component
    public class ServiceManager implements RecordListener<Service> {
        
        /**
         * Map(namespace, Map(group::serviceName, Service)).
         */
        private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
    }
    

    Service
    我们可以看到 'service --> cluster --> instance' model, in which service stores a list of clusters, which contain a list of instances.
    一个sercie可能部署了一个集群,一个集群可能会有多个实例

    /**
     * Service of Nacos server side
     *
     * <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
     * contain
     * a list of instances.
     *
     * <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
     *
     * @author nkorange
     */
    public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    private Map<String, Cluster> clusterMap = new HashMap<>();
    }
    

    Cluster
    就包含了我们在updateIps()方法内最终更新的persistentInstances 和ephemeralInstances

    public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
        @JsonIgnore
        private Set<Instance> persistentInstances = new HashSet<>();
        
        @JsonIgnore
        private Set<Instance> ephemeralInstances = new HashSet<>();
    }
    

    小结:它的数据模型就是一个Namespace下面可能会有一个服务分组,一个服务分组下面可能会有多个服务,一个服务可能会有一个集群,一个集群可能会有多个实例


    心跳发送

    com.alibaba.nacos.client.naming.NacosNamingService#registerInstance()
      ->com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo
    

    addBeatInfo()方法

    class BeatTask implements Runnable {
            
            BeatInfo beatInfo;
            
            public BeatTask(BeatInfo beatInfo) {
                this.beatInfo = beatInfo;
            }
            
            @Override
            public void run() {
                if (beatInfo.isStopped()) {
                    return;
                }
                long nextTime = beatInfo.getPeriod();
                ...
                //1.发送心跳
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
    
                ...
                //继续嵌套调用
                executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }
    

    即客户端最终使用Http请求服务端接口/instance/beat


    服务端心跳检查

    在服务注册的Init方法中com.alibaba.nacos.naming.core.Service#init
    开启了心跳检查
    com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)

    public void init() {
            HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
            for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
                entry.getValue().setService(this);
                entry.getValue().init();
            }
        }
    

    默认的心跳超时时间
    默认15秒

    ClientBeatCheckTask.run()中instance.getInstanceHeartBeatTimeOut()
    

    默认的delete时间
    默认30秒

    ClientBeatCheckTask.run()中instance.getIpDeleteTimeout()
    

    相关文章

      网友评论

          本文标题:Nacos源码解析

          本文链接:https://www.haomeiwen.com/subject/eggifltx.html