美文网首页
Spring Cloud Alibaba——Nacos AP一致

Spring Cloud Alibaba——Nacos AP一致

作者: 小波同学 | 来源:发表于2021-07-30 01:42 被阅读0次

    Eureka 一致性策略

    Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
    private boolean fetchRegistryFromBackup() {
            try {
                @SuppressWarnings("deprecation")
                BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
                if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
                    backupRegistryInstance = backupRegistryProvider.get();
                }
    
                if (null != backupRegistryInstance) {
                    Applications apps = null;
                    if (isFetchingRemoteRegionRegistries()) {
                        String remoteRegionsStr = remoteRegionsToFetch.get();
                        if (null != remoteRegionsStr) {
                            apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
                        }
                    } else {
                        apps = backupRegistryInstance.fetchRegistry();
                    }
                    if (apps != null) {
                        final Applications applications = this.filterAndShuffle(apps);
                        applications.setAppsHashCode(applications.getReconcileHashCode());
                        localRegionApps.set(applications);
                        logTotalInstances();
                        logger.info("Fetched registry successfully from the backup");
                        return true;
                    }
                } else {
                    logger.warn("No backup registry instance defined & unable to find any discovery servers.");
                }
            } catch (Throwable e) {
                logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
            }
            return false;
        }
    }
    

    正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)

    Nacos AP一致性策略——Distro

    Nacos在AP模式下的一致性策略就类似于Eureka,采用Server之间互相的数据同步来实现数据在集群中的同步、复制操作。

    触发数据广播

    @DependsOn("ProtocolManager")
    @org.springframework.stereotype.Service("distroConsistencyService")
    public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
    
        private final GlobalConfig globalConfig;
        
        private final DistroProtocol distroProtocol;
    
        @Override
        public void put(String key, Record value) throws NacosException {
            onPut(key, value);
            distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                    globalConfig.getTaskDispatchPeriod() / 2);
        }
    }
    

    当服务注册和注销实例,即InstanceController的register和deregister方法被调用时,ServiceManager类中addInstance或removeInstance方法调用ConsistencyService接口定义的put、remove方法时,涉及到了Server端数据的变更,此时会创建一个任务DistroDelayTask,将数据的key封装到DistroDelayTask中。

    @Component
    public class DistroProtocol {
    
        public void sync(DistroKey distroKey, DataOperation action, long delay) {
            for (Member each : memberManager.allMembersWithoutSelf()) {
                //遍历所有服务ip。进行数据同步
                DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                        each.getAddress());
                //创建延迟任务
                DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
                //其实也是一个定时任务
                distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
                }
            }
        }
    }
    

    然后将DistroDelayTask提交到Nacos的延时任务执行引擎NacosDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中存储,并且NacosDelayTaskExecuteEngine在其构造方法中初始化ScheduledExecutorService线程池并提交一个ProcessRunnable任务去取出tasks中的AbstractDelayTask任务进行处理。

    public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
        
        private final ScheduledExecutorService processingExecutor;
        
        protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
        
        protected final ReentrantLock lock = new ReentrantLock();
        
        //NacosDelayTaskExecuteEngine实例化
        public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
            super(logger);
            //初始化tasks
            tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
            //创建定时任务线程池
            processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
            //每间隔一段时间定时执行任务ProcessRunnable
            processingExecutor
                    .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
        }   
    
        @Override
        public void addTask(Object key, AbstractDelayTask newTask) {
            lock.lock();
            try {
                AbstractDelayTask existTask = tasks.get(key);
                //存在相同任务则合并
                if (null != existTask) {
                    newTask.merge(existTask);
                }
                //将AbstractDelayTask存入tasks中
                tasks.put(key, newTask);
            } finally {
                lock.unlock();
            }
        }
    

    ProcessRunnable线程处理存入延时任务执行引擎NacosDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中的任务

    public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
        
        private final ScheduledExecutorService processingExecutor;
        
        protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
        
        /**
         * process tasks in execute engine.
         */
        protected void processTasks() {
            //获取tasks中的所有key
            Collection<Object> keys = getAllTaskKeys();
            //遍历任务key集合
            for (Object taskKey : keys) {
                //删除tasks中的AbstractDelayTask任务
                AbstractDelayTask task = removeTask(taskKey);
                if (null == task) {
                    continue;
                }
                //获取NacosTaskProcessor,这里实质获取的是DistroDelayTaskProcessor
                //在DistroHttpRegistry中的doRegister方法中提前设置进去了DistroHttpDelayTaskProcessor
                //但是taskKey最为key会取不到DistroHttpDelayTaskProcessor
                //而DistroTaskEngineHolder类的构造函数中设置了DistroDelayTaskExecuteEngine类
                //中的DefaultTaskProcessor为DistroDelayTaskProcessor
                NacosTaskProcessor processor = getProcessor(taskKey);
                if (null == processor) {
                    getEngineLog().error("processor not found for task, so discarded. " + task);
                    continue;
                }
                try {
                    // ReAdd task if process failed
                    //DistroHttpDelayTaskProcessor执行AbstractDelayTask任务
                    if (!processor.process(task)) {
                        //执行失败,更新最后执行时间,并将该任务重新设置进tasks中,便于后续继续执行
                        retryFailedTask(taskKey, task);
                    }
                } catch (Throwable e) {
                    getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                    //执行异常,更新最后执行时间,并将该任务重新设置进tasks中,便于后续继续执行
                    retryFailedTask(taskKey, task);
                }
            }
        }
    
        private void retryFailedTask(Object key, AbstractDelayTask task) {
            //更新最后执行时间
            task.setLastProcessTime(System.currentTimeMillis());
            //并将该任务重新设置进tasks中,便于后续继续执行
            addTask(key, task);
        }
        
        private class ProcessRunnable implements Runnable {
            
            @Override
            public void run() {
                try {
                    //处理任务
                    processTasks();
                } catch (Throwable e) {
                    getEngineLog().error(e.toString(), e);
                }
            }
        }
    }
    

    在DistroTaskEngineHolder类的构造函数中设置了DistroDelayTaskExecuteEngine类中的DefaultTaskProcessor为DistroDelayTaskProcessor

    public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implements NacosTaskExecuteEngine<T> {
        
        private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();
        
        private NacosTaskProcessor defaultTaskProcessor;
        
        @Override
        public NacosTaskProcessor getProcessor(Object key) {
            //获取任务处理器
            return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor;
        }
    }
    
        
    @Component
    public class DistroHttpRegistry {
        
        private final DistroComponentHolder componentHolder;
        
        private final DistroTaskEngineHolder taskEngineHolder;
        
        @PostConstruct
        public void doRegister() {
            componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                    new DistroDataStorageImpl(dataStore, distroMapper));
            componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));
            componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                    new DistroHttpCombinedKeyTaskFailedHandler(globalConfig, taskEngineHolder));
            //DistroHttpRegistry实例化后设置了DistroHttpDelayTaskProcessor 
            taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                    new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
            componentHolder.registerDataProcessor(consistencyService);
        }
    }   
    
    
    @Component
    public class DistroTaskEngineHolder {
        
        private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
        
        private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
        
        public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
            DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
            //设置了DistroDelayTaskExecuteEngine类中的DefaultTaskProcessor为DistroDelayTaskProcessor
            delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
        }
    }
    

    DistroDelayTaskProcessor执行AbstractDelayTask任务

    • 在DistroDelayTaskProcessor中process方法中创建了DistroSyncChangeTask任务
    public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    
        private final DistroTaskEngineHolder distroTaskEngineHolder;
        
        @Override
        public boolean process(NacosTask task) {
            if (!(task instanceof DistroDelayTask)) {
                return true;
            }
            DistroDelayTask distroDelayTask = (DistroDelayTask) task;
            DistroKey distroKey = distroDelayTask.getDistroKey();
            if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
                //又创建了一个定时任务
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            }
            return false;
        }
    }
    

    DistroSyncChangeTask任务会被添加进TaskExecuteWorker持有的BlockingQueue队列中用于异步执行。

    TaskExecuteWorker中的构造函数开启InnerWorker线程,实时监听queue中的任务,进行消费。

    public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
    
        private final TaskExecuteWorker[] executeWorkers;
        
        @Override
        public void addTask(Object tag, AbstractExecuteTask task) {
            NacosTaskProcessor processor = getProcessor(tag);
            if (null != processor) {
                processor.process(task);
                return;
            }
            TaskExecuteWorker worker = getWorker(tag);
            //执行任务
            worker.process(task);
        }
    }
    
    public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
    
        /**
         * Max task queue size 32768. 队列大小
         */
        private static final int QUEUE_CAPACITY = 1 << 15;
    
        private final Logger log;
    
        private final String name;
    
        private final BlockingQueue<Runnable> queue;
    
        public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
            this.name = name + "_" + mod + "%" + total;
            //初始化队列
            this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
            this.closed = new AtomicBoolean(false);
            this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
            //开启InnerWorker线程,实时监听queue中的任务,进行消费
            new InnerWorker(name).start();
        }
    
        @Override
        public boolean process(NacosTask task) {
            //DistroSyncChangeTask 是AbstractExecuteTask的子类
            if (task instanceof AbstractExecuteTask) {
                //添加任务
                putTask((Runnable) task);
            }
            return true;
        }
        
        private void putTask(Runnable task) {
            try {
                //向队列中添加任务
                queue.put(task);
            } catch (InterruptedException ire) {
                log.error(ire.toString(), ire);
            }
        }
    }
    

    TaskExecuteWorker中的构造函数开启InnerWorker线程,实时监听queue中的任务,进行消费,执行任务。

    public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
    
        private final BlockingQueue<Runnable> queue;
    
        public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
            this.name = name + "_" + mod + "%" + total;
            //初始化队列
            this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
            this.closed = new AtomicBoolean(false);
            this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
            //开启InnerWorker线程,实时监听queue中的任务,进行消费
            new InnerWorker(name).start();
        }
    
        private class InnerWorker extends Thread {
    
            InnerWorker(String name) {
                setDaemon(false);
                setName(name);
            }
    
            @Override
            public void run() {
                while (!closed.get()) {
                    try {
                        //获取队列中的任务
                        Runnable task = queue.take();
                        long begin = System.currentTimeMillis();
                        //执行任务
                        task.run();
                        long duration = System.currentTimeMillis() - begin;
                        if (duration > 1000L) {
                            log.warn("distro task {} takes {}ms", task, duration);
                        }
                    } catch (Throwable e) {
                        log.error("[DISTRO-FAILED] " + e.toString(), e);
                    }
                }
            }
        }
    }
    

    DistroSyncChangeTask线程

    • 首先通过DistroKey中的key集合去DataStore中去查询key所对应的数据集合,然后对数据进行序列化操作,转为byte[]数组后,执行Http请求操作——NamingProxy.syncData(data, task.getTargetServer());如果数据广播失败,则重试失败任务,重新包装为DistroDelayTask任务进行执行。
    public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    
        private final DistroComponentHolder distroComponentHolder;
    
        @Override
        public void run() {
            Loggers.DISTRO.info("[DISTRO-START] {}", toString());
            try {
                String type = getDistroKey().getResourceType();
                //key集合去DataStore中去查询key所对应的数据集合
                DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
                distroData.setType(DataOperation.CHANGE);
                //是调用接口地址,通知其他服务,同步服务变更数据
                boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
                if (!result) {
                    //重试失败任务
                    handleFailedTask();
                }
                Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
            } catch (Exception e) {
                Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
                handleFailedTask();
            }
        }
    }
    
    
    public class DistroHttpAgent implements DistroTransportAgent {
    
        private final ServerMemberManager memberManager;
    
        @Override
        public boolean syncData(DistroData data, String targetServer) {
            if (!memberManager.hasMember(targetServer)) {
                return true;
            }
            byte[] dataContent = data.getContent();
            //同步数据
            return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
        }
    }
    
    public class NamingProxy {
        
        private static final String DATA_ON_SYNC_URL = "/distro/datum";
    
        public static boolean syncData(byte[] data, String curServer) {
            Map<String, String> headers = new HashMap<>(128);
            
            headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
            headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
            headers.put(HttpHeaderConsts.ACCEPT_ENCODING, "gzip,deflate,sdch");
            headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive");
            headers.put(HttpHeaderConsts.CONTENT_ENCODING, "gzip");
            
            try {
                //请求/distro/datum接口
                RestResult<String> result = HttpClient.httpPutLarge(
                        "http://" + curServer + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
                                + DATA_ON_SYNC_URL, headers, data);
                if (result.ok()) {
                    return true;
                }
                if (HttpURLConnection.HTTP_NOT_MODIFIED == result.getCode()) {
                    return true;
                }
                throw new IOException("failed to req API:" + "http://" + curServer + EnvUtil.getContextPath()
                        + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.getCode() + " msg: "
                        + result.getData());
            } catch (Exception e) {
                Loggers.SRV_LOG.warn("NamingProxy", e);
            }
            return false;
        }
    }
    

    这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns/distro/datum中,而该URL对应的处理器为DistroController中的public String onSyncDatum(HttpServletRequest request, HttpServletResponse response)方法

    @RestController
    @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")
    public class DistroController {
    
        @Autowired
        private DistroProtocol distroProtocol;
        
        @Autowired
        private ServiceManager serviceManager;
    
        @PutMapping("/datum")
        public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
            
            if (dataMap.isEmpty()) {
                Loggers.DISTRO.error("[onSync] receive empty entity!");
                throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
            }
            
            for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
                if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
                    String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                    String serviceName = KeyBuilder.getServiceName(entry.getKey());
                    if (!serviceManager.containService(namespaceId, serviceName) && switchDomain
                            .isDefaultInstanceEphemeral()) {
                        serviceManager.createEmptyService(namespaceId, serviceName, true);
                    }
                    DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
                    distroProtocol.onReceive(distroHttpData);
                }
            }
            return ResponseEntity.ok("ok");
        }
    }
    

    这里会调用DistroConsistencyServiceImpl.processData(DistroData distroData)方法,然后接着调用DistroConsistencyServiceImpl.onPut(String key, Record value)方法,将数据添加到Notifier中的ArrayBlockingQueue队列tasks中进行服务的注册。

    Nacos服务端注册流程:https://www.jianshu.com/p/99f67f6f2577

    至此完成了Nacos Server在AP模式下的数据的最终一致性操作。

    参考:
    https://www.liaochuntao.cn/2019/05/09/java-web-32/

    相关文章

      网友评论

          本文标题:Spring Cloud Alibaba——Nacos AP一致

          本文链接:https://www.haomeiwen.com/subject/qjofvltx.html