美文网首页spring cloud
Spring Cloud Alibaba——Nacos服务端注册

Spring Cloud Alibaba——Nacos服务端注册

作者: 小波同学 | 来源:发表于2021-07-27 01:41 被阅读0次

    一、Nacos Server服务注册流程——AP模式

    • Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl。

    1、在Nacos Server的nacos-naming工程下的InstanceController类中的register方法作为服务注册的入口

    @RestController
    @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
    public class InstanceController {
    
        @Autowired
        private ServiceManager serviceManager;
    
        @CanDistro
        @PostMapping
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
        public String register(HttpServletRequest request) throws Exception {
            //这里可以看出Nacos作为服务注册中心没有用到group
            //命名空间
            final String namespaceId = WebUtils
                    .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
            //服务名称
            final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            NamingUtils.checkServiceNameFormat(serviceName);
            
            //将服务注册注册请求参数转换成Instance
            final Instance instance = parseInstance(request);
            
            //注册实例
            serviceManager.registerInstance(namespaceId, serviceName, instance);
            return "ok";
        }
    }
    

    2、serviceManager.registerInstance注册服务实例

    @Component
    public class ServiceManager implements RecordListener<Service> {
    
        //Nacos的注册表
        private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
    
        @Resource(name = "consistencyDelegate")
        private ConsistencyService consistencyService;
    
        public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
            //创建一个空的服务
            createEmptyService(namespaceId, serviceName, instance.isEphemeral());
            
            //获取服务,从Nacos的注册表获取服务
            Service service = getService(namespaceId, serviceName);
            
            if (service == null) {
                throw new NacosException(NacosException.INVALID_PARAM,
                        "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
            
            //新增实例
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }
        
        public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
                throws NacosException {
            
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
            
            //获取服务,从Nacos的注册表获取服务
            Service service = getService(namespaceId, serviceName);
            
            //加锁,同一时间同一命名空间下的同一服务,只能允许有一个服务注册请求
            synchronized (service) {
                //更新并返回总的instanceList列表
                List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
                
                //创建新的instance列表对象
                Instances instances = new Instances();
                instances.setInstanceList(instanceList);
                //将实例列表集合和key设置进consistencyService中
                consistencyService.put(key, instances);
            }
        }
        
        //获取服务
        public Service getService(String namespaceId, String serviceName) {
            if (serviceMap.get(namespaceId) == null) {
                return null;
            }
            return chooseServiceMap(namespaceId).get(serviceName);
        }   
    }
    

    3、addIpAddresses更新并返回总的instance服务实例列表

    这里面还做了挺多的事,先是获取老的数据(持久的或者临时的),从一致性服务里获取,因为这个数据是要同步更新的,所以要拿出来及时更新,然后获取服务实例(持久的或者临时的),用他们来更新的老的数据,然后遍历新增的实例,如果没有集群的话先创建集群,并初始化集群,会开启心跳检查,最后根据是添加还是删除实例来更新老的实例映射,最后封装成集合返回最新的实例集合。

    @Component
    public class ServiceManager implements RecordListener<Service> {
    
        @Resource(name = "consistencyDelegate")
        private ConsistencyService consistencyService;
    
        private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
            return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
        }
        
        public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
                throws NacosException {
            
            //重DataStore类中的dataMap获取老的实例集合数据
            Datum datum = consistencyService
                    .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
            
            //获取集群中所有相关的实例集合,临时的或者是永久的
            List<Instance> currentIPs = service.allIPs(ephemeral);
            //IP端口和实例的映射
            Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
            //实例ID集合
            Set<String> currentInstanceIds = Sets.newHashSet();
            
            //放入对应的集合里
            for (Instance instance : currentIPs) {
                currentInstances.put(instance.toIpAddr(), instance);
                currentInstanceIds.add(instance.getInstanceId());
            }
            
            //更新后的老的实例集合
            Map<String, Instance> instanceMap;
            if (datum != null && null != datum.value) {
                //根据当前服务实例的健康标志和心跳时间,来更新老的实例集合数据
                instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
            } else {
                //重新创建一个
                instanceMap = new HashMap<>(ips.length);
            }
            
            for (Instance instance : ips) {
                //遍历新的实例
                if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                    //不存在就创建服务实例集群
                    Cluster cluster = new Cluster(instance.getClusterName(), service);
                    //初始化,开启集群心跳检查
                    cluster.init();
                    //添加服务实例集群
                    service.getClusterMap().put(instance.getClusterName(), cluster);
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                }
                
                //删除操作的话就删除老的实例集合的数据
                if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                    instanceMap.remove(instance.getDatumKey());
                } else {
                    //否则添加
                    Instance oldInstance = instanceMap.get(instance.getDatumKey());
                    if (oldInstance != null) {
                        //存在原实例,则直接使用原服务InstanceId
                        instance.setInstanceId(oldInstance.getInstanceId());
                    } else {
                        //否则,则直接使用原服务InstanceId
                        instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
                    }
                    instanceMap.put(instance.getDatumKey(), instance);
                }
                
            }
            
            if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
                throw new IllegalArgumentException(
                        "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                                .toJson(instanceMap.values()));
            }
            //返回总的实例集合
            return new ArrayList<>(instanceMap.values());
        }   
    }
    
    • Service的allIPs获取集群中的实例集合
      遍历集群,获取集群里的实例集合,临时的或者是永久的。
    public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
      
        private Map<String, Cluster> clusterMap = new HashMap<>();
        
        public List<Instance> allIPs(boolean ephemeral) {
            List<Instance> result = new ArrayList<>();
            for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
                result.addAll(entry.getValue().allIPs(ephemeral));
            }
            
            return result;
        }
    }
    
    • ServiceManager的setValid更新老的实例集合
    • 其实就是用服务集群中获取的实例集合去更新老的实例集合,健康状态和心跳时间。
    @Component
    public class ServiceManager implements RecordListener<Service> {
    
        private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) {
            
            Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size());
            //遍历老的实例集合,如果新的实例存在的话就更新
            for (Instance instance : oldInstances) {
                //获取对应新的实例
                Instance instance1 = map.get(instance.toIpAddr());
                //存在就更新
                if (instance1 != null) {
                    instance.setHealthy(instance1.isHealthy());
                    instance.setLastBeat(instance1.getLastBeat());
                }
                //放入映射
                instanceMap.put(instance.getDatumKey(), instance);
            }
            return instanceMap;
        }
    }
    
    • Cluster的init集群初始化
    • 即是开启一个心跳检查的任务
    public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    
        private HealthCheckTask checkTask;
        
        public void init() {
            if (inited) {
                return;
            }
            checkTask = new HealthCheckTask(this);
            
            HealthCheckReactor.scheduleCheck(checkTask);
            inited = true;
        }
    }
    

    4、将服务注册请求放入到ArrayBlockingQueue阻塞队列中,并将服务实例存入DataStore中的dataMap中

    @DependsOn("ProtocolManager")
    @Service("consistencyDelegate")
    public class DelegateConsistencyServiceImpl implements ConsistencyService {
    
        @Override
        public void put(String key, Record value) throws NacosException {
            mapConsistencyService(key).put(key, value);
        }
    }
    
    @DependsOn("ProtocolManager")
    @org.springframework.stereotype.Service("distroConsistencyService")
    public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
        
        private final DataStore dataStore;
        
        private volatile Notifier notifier = new Notifier();    
        
        @Override
        public void put(String key, Record value) throws NacosException {
            onPut(key, value);
            distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                    globalConfig.getTaskDispatchPeriod() / 2);
        }
        
        public void onPut(String key, Record value) {
            
            if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
                Datum<Instances> datum = new Datum<>();
                datum.value = (Instances) value;
                datum.key = key;
                datum.timestamp.incrementAndGet();
                //将服务实例集合存入DataStore中的dataMap中
                dataStore.put(key, datum);
            }
            
            if (!listeners.containsKey(key)) {
                return;
            }
            
            //添加数据变更的任务
            notifier.addTask(key, DataOperation.CHANGE);
        }   
        
        public class Notifier implements Runnable {
            
            private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
            
            private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
            
            /**
             * Add new notify task to queue.
             *
             * @param datumKey data key
             * @param action   action for data
             */
            public void addTask(String datumKey, DataOperation action) {
                
                if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                    return;
                }
                if (action == DataOperation.CHANGE) {
                    services.put(datumKey, StringUtils.EMPTY);
                }
                //在ArrayBlockingQueue中添加任务
                tasks.offer(Pair.with(datumKey, action));
            }
        }       
    }
    

    注意:在DistroConsistencyServiceImpl实例化完成之后,启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费

    @DependsOn("ProtocolManager")
    @org.springframework.stereotype.Service("distroConsistencyService")
    public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
        
        private volatile Notifier notifier = new Notifier();    
        
        @PostConstruct
        public void init() {
            //当前Bean实例话后,启动异步线程池,监听ArrayBlockingQueue中的任务进行消费
            GlobalExecutor.submitDistroNotifyTask(notifier);
        }
    }
    

    5、将服务注册请求放入到ArrayBlockingQueue阻塞队列后,处理该阻塞队列中的任务,在Notifier中的run方法中处理该任务

    @DependsOn("ProtocolManager")
    @org.springframework.stereotype.Service("distroConsistencyService")
    public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
        
        private final DataStore dataStore;
        
        private volatile Notifier notifier = new Notifier();    
        
        public class Notifier implements Runnable {
            
            private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
            
            private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
            
            public int getTaskSize() {
                return tasks.size();
            }
            
            @Override
            public void run() {
                Loggers.DISTRO.info("distro notifier started");
                
                for (; ; ) {
                    try {
                        //取出注册请求任务
                        Pair<String, DataOperation> pair = tasks.take();
                        //处理任务
                        handle(pair);
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                    }
                }
            }
            
            private void handle(Pair<String, DataOperation> pair) {
                try {
                    String datumKey = pair.getValue0();
                    DataOperation action = pair.getValue1();
                    
                    services.remove(datumKey);
                    
                    int count = 0;
                    
                    if (!listeners.containsKey(datumKey)) {
                        return;
                    }
                    
                    for (RecordListener listener : listeners.get(datumKey)) {
                        
                        count++;
                        
                        try {
                            //如果是一个数据变更动作,服务注册数据数据变更
                            if (action == DataOperation.CHANGE) {
                                //从dataStore中获取服务注册请求放入的服务实例集合
                                listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }
                            //如果是一个数据删除动作
                            if (action == DataOperation.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                        }
                    }
                    
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO
                                .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                        datumKey, count, action.name());
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
    }
    

    6、从dataStore中获取服务注册请求放入的服务实例集合,调用listener.onChange方法注册

    @JsonInclude(Include.NON_NULL)
    public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    
        private Map<String, Cluster> clusterMap = new HashMap<>();
    
        @Override
        public void onChange(String key, Instances value) throws Exception {
            
            Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
            
            for (Instance instance : value.getInstanceList()) {
                
                if (instance == null) {
                    // Reject this abnormal instance list:
                    throw new RuntimeException("got null instance " + key);
                }
                
                //权重最大值边界设定
                if (instance.getWeight() > 10000.0D) {
                    instance.setWeight(10000.0D);
                }
                //权重最小值边界设定
                if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                    instance.setWeight(0.01D);
                }
            }
            //更新IP列表
            updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
            
            recalculateChecksum();
        }
        
        public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
            Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
            for (String clusterName : clusterMap.keySet()) {
                ipMap.put(clusterName, new ArrayList<>());
            }
            
            //遍历服务注册的实例列表
            for (Instance instance : instances) {
                try {
                    if (instance == null) {
                        Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                        continue;
                    }
                    
                    if (StringUtils.isEmpty(instance.getClusterName())) {
                        //ClusterName为空,则设置默认值
                        instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                    }
                    
                    //如果不包含ClusterName,则初始化
                    if (!clusterMap.containsKey(instance.getClusterName())) {
                        Loggers.SRV_LOG
                                .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                        instance.getClusterName(), instance.toJson());
                        Cluster cluster = new Cluster(instance.getClusterName(), this);
                        //初始化Cluster,即先创建服务健康检查任务,
                        //并调用HealthCheckReactor.scheduleCheck执行健康检查任务,即心跳机制
                        cluster.init();
                        //根据ClusterName注册服务集群实例
                        getClusterMap().put(instance.getClusterName(), cluster);
                    }
                    //根据ClusterName获取集群IP集合
                    List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                    if (clusterIPs == null) {
                        //IP列表为空,也注册IP为空的服务
                        clusterIPs = new LinkedList<>();
                        ipMap.put(instance.getClusterName(), clusterIPs);
                    }
                    //新增服务实例
                    clusterIPs.add(instance);
                } catch (Exception e) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
                }
            }
            
            for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
                //make every ip mine
                List<Instance> entryIPs = entry.getValue();
                //更新为临时节点
                clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
            }
            //设置最后更新的时间
            setLastModifiedMillis(System.currentTimeMillis());
            //广播,UDP通知通客户端service发生了改变
            getPushService().serviceChanged(this);
            StringBuilder stringBuilder = new StringBuilder();
            
            for (Instance instance : allIPs()) {
                stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
            }
            
            Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                    stringBuilder.toString());
            
        }   
    }
    
    
    public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    
        @JsonIgnore
        private HealthCheckTask checkTask;
    
        public void init() {
            if (inited) {
                return;
            }
            //创建健康检查任务
            checkTask = new HealthCheckTask(this);
            //执行健康检查任务
            HealthCheckReactor.scheduleCheck(checkTask);
            inited = true;
        }
    }
    
    • 最核心的就是updateIPs
    public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    
        @JsonIgnore
        private Set<Instance> persistentInstances = new HashSet<>();
        
        @JsonIgnore
        private Set<Instance> ephemeralInstances = new HashSet<>();
    
        public void updateIps(List<Instance> ips, boolean ephemeral) {
            //拿到cluster中旧的instance列表
            Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
            
            HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
            
            for (Instance ip : toUpdateInstances) {
                oldIpMap.put(ip.getDatumKey(), ip);
            }
             //updatedIps主要做的是找出oldipmap中的实例并返回
            List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
            if (updatedIPs.size() > 0) {
                for (Instance ip : updatedIPs) {
                    Instance oldIP = oldIpMap.get(ip.getDatumKey());
                    
                    // do not update the ip validation status of updated ips
                    // because the checker has the most precise result
                    // Only when ip is not marked, don't we update the health status of IP:
                    if (!ip.isMarked()) {
                        ip.setHealthy(oldIP.isHealthy());
                    }
                    
                    if (ip.isHealthy() != oldIP.isHealthy()) {
                        // ip validation status updated
                        Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                                (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
                    }
                    
                    if (ip.getWeight() != oldIP.getWeight()) {
                        // ip validation status updated
                        Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                                ip.toString());
                    }
                }
            }
            //找出新增的实例列表,即ips中的实例,oldipmap不存在的实例列表
            List<Instance> newIPs = subtract(ips, oldIpMap.values());
            if (newIPs.size() > 0) {
                Loggers.EVT_LOG
                        .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                                getName(), newIPs.size(), newIPs.toString());
                
                for (Instance ip : newIPs) {
                    //进行新实例的健康检查设置
                    HealthCheckStatus.reset(ip);
                }
            }
            //找出oldipmap的ip的实例。不存在于ips中的实例
            List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
            
            if (deadIPs.size() > 0) {
                Loggers.EVT_LOG
                        .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                                getName(), deadIPs.size(), deadIPs.toString());
                
                for (Instance ip : deadIPs) {
                    //将不存在新实例ip列表的值的健康检查删除
                    HealthCheckStatus.remv(ip);
                }
            }
            
            toUpdateInstances = new HashSet<>(ips);
            
            //更新服务下cluster的intances列表
            if (ephemeral) {
                ephemeralInstances = toUpdateInstances;
            } else {
                persistentInstances = toUpdateInstances;
            }
        }
    }
    

    为了防止读写并发冲突,直接创建了一个新的HashMap,然后去操作新的HashMap,操作完了之后再去替换老的Map数据,CopyOnWrite的思想。最后还发布了服务变化事件。

    • 服务注册通过CopyOnWrite支持并发读写的能力
    • Cluster类中的updateIPs方法中是对原服务IP列表的副本进行操作,注册完成替换原有服务IP列表即可,即CopyOnWrite操作,不需要加锁,性能高,存在服务延迟。

    Eureka防止读写冲突用的是多级缓存结构,多级缓存定时同步,客户端感知及时性不如Nacos。

    7、同步实例信息到Nacos Server集群其它节点

    回到之前的代码,put方法中distroProtocol.sync();进行同步信息到集群其它节点,跟进代码:

    @DependsOn("ProtocolManager")
    @org.springframework.stereotype.Service("distroConsistencyService")
    public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
    
        private final DistroProtocol distroProtocol;
        
        @Override
        public void put(String key, Record value) throws NacosException {
            onPut(key, value);
            distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                    globalConfig.getTaskDispatchPeriod() / 2);
        }
    }
    
    • 通过newSingleScheduledExecutorService.scheduleWithFixedDelay()定时执行ProcessRunnable任务,发送http请求同步实例信息到Nacos Server集群其它节点
    @Component
    public class DistroProtocol {
    
        public void sync(DistroKey distroKey, DataOperation action, long delay) {
            for (Member each : memberManager.allMembersWithoutSelf()) {
                //遍历所有服务ip。进行数据同步
                DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                        each.getAddress());
                //创建延迟任务
                DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
                //其实也是一个定时任务
                distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
                }
            }
        }
    }
    
    public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
    
        private final ScheduledExecutorService processingExecutor;
    
        protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
        
        //NacosDelayTaskExecuteEngine实例化
        public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
            super(logger);
            //初始化tasks
            tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
            //创建定时任务线程池
            processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
            //每间隔一段时间定时执行任务ProcessRunnable
            processingExecutor
                    .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
        }   
    
        @Override
        public void addTask(Object key, AbstractDelayTask newTask) {
            lock.lock();
            try {
                AbstractDelayTask existTask = tasks.get(key);
                if (null != existTask) {
                    newTask.merge(existTask);
                }
                tasks.put(key, newTask);
            } finally {
                lock.unlock();
            }
        }
        
        /**
         * process tasks in execute engine.
         */
        protected void processTasks() {
            Collection<Object> keys = getAllTaskKeys();
            for (Object taskKey : keys) {
                AbstractDelayTask task = removeTask(taskKey);
                if (null == task) {
                    continue;
                }
                NacosTaskProcessor processor = getProcessor(taskKey);
                if (null == processor) {
                    getEngineLog().error("processor not found for task, so discarded. " + task);
                    continue;
                }
                try {
                    // ReAdd task if process failed
                    if (!processor.process(task)) {
                        retryFailedTask(taskKey, task);
                    }
                } catch (Throwable e) {
                    getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                    retryFailedTask(taskKey, task);
                }
            }
        }   
        
        private class ProcessRunnable implements Runnable {
            
            @Override
            public void run() {
                try {
                    processTasks();
                } catch (Throwable e) {
                    getEngineLog().error(e.toString(), e);
                }
            }
        }   
    }
    
    public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    
        @Override
        public boolean process(NacosTask task) {
            if (!(task instanceof DistroDelayTask)) {
                return true;
            }
            DistroDelayTask distroDelayTask = (DistroDelayTask) task;
            DistroKey distroKey = distroDelayTask.getDistroKey();
            if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
                //又创建了一个定时任务
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            }
            return false;
        }
    }
    
    public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    
        @Override
        public void run() {
            Loggers.DISTRO.info("[DISTRO-START] {}", toString());
            try {
                String type = getDistroKey().getResourceType();
                DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
                distroData.setType(DataOperation.CHANGE);
                //是调用接口地址,通知其他服务,同步服务变更数据
                boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
                if (!result) {
                    handleFailedTask();
                }
                Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
            } catch (Exception e) {
                Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
                handleFailedTask();
            }
        }
    }
    

    二、Nacos服务端CP模式实现:RaftConsistencyServiceImpl
    Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl,简单介绍一下大概实现方式:

    • 1、是阿里自己实现的CP模式的简单raft协议。

    • 2、判断自己是Leader节点的话才执行逻辑,否则转发给Leader。

    • 3、同步更新实例数据到磁盘,异步更新内存注册表。

    • 4、用CountDownLatch实现,必须集群半数以上节点写入成功才返回客户端成功。

    • 5、成功后调用/raft/datum/commit接口提交。

    • 6、发布ValueChangeEvent事件

    • 7、PersistentNotifier监听ValueChangeEvent事件,处理服务变更,调用Service的updateIPs方法进行服务注册完成替换原有服务IP列表。

    RaftConsistencyServiceImpl的put永久实例集合一致性服务

    • 和raft选举算法有关。
    @Deprecated
    @DependsOn("ProtocolManager")
    @Service
    public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
        
        private final RaftCore raftCore;
        
        @Override
        public void put(String key, Record value) throws NacosException {
            checkIsStopWork();
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                        e);
            }
        }
    }
    

    raftCore.signalPublish

    这个其实涉及到raft选举的协议,如果本服务不是leader就要交给leader去处理,就发一个http请求给leader,leader接受到之后还是会到他的signalPublish里处理。如果是leader的话就进行服务实例改变通知,通知本地的监听器,并且要同步到其他结点,使用过半机制,刚好CountDownLatch可以用,只要有过半响应成功就算同步成功。

    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        private final RaftProxy raftProxy;
        
        public void signalPublish(String key, Record value) throws Exception {
            if (stopWork) {
                throw new IllegalStateException("old raft protocol already stop work");
            }
            //不是leader
            if (!isLeader()) {
                ObjectNode params = JacksonUtils.createEmptyJsonNode();
                params.put("key", key);
                params.replace("value", JacksonUtils.transferToJsonNode(value));
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
                
                final RaftPeer leader = getLeader();
                //交给leader去做/v1/ns/raft/datum
                raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
                return;
            }
            
            OPERATE_LOCK.lock();
            //是leader
            try {
                final long start = System.currentTimeMillis();
                final Datum datum = new Datum();
                datum.key = key;
                datum.value = value;
                if (getDatum(key) == null) {
                    datum.timestamp.set(1L);
                } else {
                    datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
                }
                
                ObjectNode json = JacksonUtils.createEmptyJsonNode();
                json.replace("datum", JacksonUtils.transferToJsonNode(datum));
                json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
                //发布数据改变通知
                onPublish(datum, peers.local());
                
                final String content = json.toString();
                //只要过半的结点数
                final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
                //遍历所有结点
                for (final String server : peers.allServersIncludeMyself()) {
                    if (isLeader(server)) {
                        //自己算一次
                        latch.countDown();
                        continue;
                    }
                    ///v1/ns/raft/datum/commit
                    final String url = buildUrl(server, API_ON_PUB);
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                        @Override
                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                datum.key, server, result.getCode());
                                return;
                            }
                            //异步完成
                            latch.countDown();
                        }
                        
                        @Override
                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                        }
                        
                        @Override
                        public void onCancel() {
                        
                        }
                    });
                    
                }
                //等待半数完成
                if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    // only majority servers return success can we consider this update success
                    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
                }
                
                long end = System.currentTimeMillis();
                Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
            } finally {
                OPERATE_LOCK.unlock();
            }
        }
    }
    

    RaftCore中的onPublish(datum, peers.local());会发布一个ValueChangeEvent事件

    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        public void onPublish(Datum datum, RaftPeer source) throws Exception {
            if (stopWork) {
                throw new IllegalStateException("old raft protocol already stop work");
            }
            RaftPeer local = peers.local();
            if (datum.value == null) {
                Loggers.RAFT.warn("received empty datum");
                throw new IllegalStateException("received empty datum");
            }
            
            if (!peers.isLeader(source.ip)) {
                Loggers.RAFT
                        .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                                JacksonUtils.toJson(getLeader()));
                throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
            }
            
            if (source.term.get() < local.term.get()) {
                Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(local));
                throw new IllegalStateException(
                        "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
            }
            
            local.resetLeaderDue();
            
            // if data should be persisted, usually this is true:
            if (KeyBuilder.matchPersistentKey(datum.key)) {
                raftStore.write(datum);
            }
            
            datums.put(datum.key, datum);
            
            if (isLeader()) {
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            } else {
                if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                    //set leader term:
                    getLeader().term.set(source.term.get());
                    local.term.set(getLeader().term.get());
                } else {
                    local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
                }
            }
            raftStore.updateTerm(local.term.get());
            
            //发布ValueChangeEvent事件
            NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
            Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
        }
    }
    

    PersistentNotifier

    会发布一个ValueChangeEvent事件,在PersistentNotifier中监听并处理

    public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
    
        private final Map<String, ConcurrentHashSet<RecordListener>> listenerMap = new ConcurrentHashMap<>(32);
    
        public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
            if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
                if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {
                    for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
                        try {
                            if (action == DataOperation.CHANGE) {
                                listener.onChange(key, value);
                            }
                            if (action == DataOperation.DELETE) {
                                listener.onDelete(key);
                            }
                        } catch (Throwable e) {
                            Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
                        }
                    }
                }
            }
            
            if (!listenerMap.containsKey(key)) {
                return;
            }
            
            for (RecordListener listener : listenerMap.get(key)) {
                try {
                    if (action == DataOperation.CHANGE) {
                        listener.onChange(key, value);
                        continue;
                    }
                    if (action == DataOperation.DELETE) {
                        listener.onDelete(key);
                    }
                } catch (Throwable e) {
                    Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
                }
            }
        }
    
        @Override
        public void onEvent(ValueChangeEvent event) {
            notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
        }
    }
    

    总结:

    • 1、通过启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费。
    • 2、通过高性能的内存监听队列将服务请求的写和处理进行分割。

    好处:

    • 提高性能:服务提供方发起注册和注册中心处理服务注册的实现分离。
    • 采用ArrayBlockingQueue内存队列,避免了并发写的处理问题。

    参考:
    https://www.cnblogs.com/chz-blogs/p/14325288.html

    https://www.cnblogs.com/guoxiaoyu/p/14248226.html

    https://blog.csdn.net/wangwei19871103/article/details/105834317

    https://blog.csdn.net/wangwei19871103/article/details/105835207

    相关文章

      网友评论

        本文标题:Spring Cloud Alibaba——Nacos服务端注册

        本文链接:https://www.haomeiwen.com/subject/cyjnmltx.html