美文网首页Dubbo专题
Dubbo-服务注册中心之AbstractRegistry

Dubbo-服务注册中心之AbstractRegistry

作者: 九点半的马拉 | 来源:发表于2019-07-29 22:34 被阅读0次
    1.png

    在dubbo中,关于注册中心Registry的有关实现封装在了dubbo-registry模块中。提供者(Provider)个消费者(Consumer)都是通过注册中心进行资源的调度。当服务启动时,provider会调用注册中心的register方法将自己的服务通过url的方式发布到注册中心,而consumer订阅其他服务时,会将订阅的服务通过url发送给注册中心(URL中通常会包含各种配置)。当某个服务被关闭时,它则会从注册中心中移除,当某个服务被修改时,则会调用notify方法触发所有的监听器。
    首先简单介绍一下在dubbo的基本统一数据模型URL

    统一数据模型URL

    在dubbo中定义的url与传统的url有所不同,用于在扩展点之间传输数据,可以从url参数中获取配置信息等数据,这一点很重要。
    描述一个dubbo协议的服务

    dubbo://192.168.1.6:20880/moe.cnkirito.sample.HelloService?timeout=3000

    描述一个消费者

    consumer://30.5.120.217/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=1209&qos.port=33333&side=consumer&timestamp=1545721827784

    接下来将着重介绍几个重要的类。

    AbstractRegistry

    AbstractRegistry实现的是Registry接口,是Registry的抽象类。为了减轻注册中心的压力,在该类中实现了把本地url缓存到内存缓存property文件中,并且实现了注册中心的注册、订阅等方法。


    2.png

    在该类中有介个关于url的变量。

    • private final Set<URL> registered = new ConcurrentHashSet<URL>();
      -> 记录已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的。
    • private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
      -> 消费者url订阅的监听器集合
    • private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
      -> 某个消费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名,value是该类下的服务url集合。
    • private URL registryUrl;
      -> 注册中心URL
    • private File file;
      -> 本地磁盘缓存文件,缓存注册中心的数据

    初始化

        public AbstractRegistry(URL url) {
            //1. 设置配置中心的地址
            setUrl(url);
            //2. 配置中心的URL中是否配置了同步保存文件属性,否则默认为false
            syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
            //3. 配置信息本地缓存的文件名
            String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
            //逐层创建文件目录
            File file = null;
            if (ConfigUtils.isNotEmpty(filename)) {
                file = new File(filename);
                if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                    if (!file.getParentFile().mkdirs()) {
                        throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                    }
                }
            }
            this.file = file;
            //如果现有配置缓存,则从缓存文件中加载属性
            loadProperties();
            notify(url.getBackupUrls());
        }
    

    加载本地磁盘缓存文件到内存缓存中,也就是把文件中的数据写入到properties中

     private void loadProperties() {
            if (file != null && file.exists()) {
                InputStream in = null;
                try {
                    in = new FileInputStream(file);
                    // 把数据写入到内存缓存中
                    properties.load(in);
                    if (logger.isInfoEnabled()) {
                        logger.info("Load registry store file " + file + ", data: " + properties);
                    }
                } catch (Throwable e) {
                    logger.warn("Failed to load registry store file " + file, e);
                } finally {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (IOException e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                }
            }
        }
    

    注册与取消注册

    对registered变量执行add和remove操作

    @Override
        public void register(URL url) {
            if (url == null) {
                throw new IllegalArgumentException("register url == null");
            }
            if (logger.isInfoEnabled()) {
                logger.info("Register: " + url);
            }
            registered.add(url);
        }
    
        @Override
        public void unregister(URL url) {
            if (url == null) {
                throw new IllegalArgumentException("unregister url == null");
            }
            if (logger.isInfoEnabled()) {
                logger.info("Unregister: " + url);
            }
            registered.remove(url);
        }
    

    订阅与取消订阅

    通过消费者url从subscribed变量中获取该消费者的所有监听器集合,然后将该监听器放入到集合中,取消同理。

    @Override
        public void subscribe(URL url, NotifyListener listener) {
            if (url == null) {
                throw new IllegalArgumentException("subscribe url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("subscribe listener == null");
            }
            if (logger.isInfoEnabled()) {
                logger.info("Subscribe: " + url);
            }
            // 获得该消费者url 已经订阅的服务 的监听器集合
            Set<NotifyListener> listeners = subscribed.get(url);
            if (listeners == null) {
                subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
                listeners = subscribed.get(url);
            }
            // 添加某个服务的监听器
            listeners.add(listener);
        }
    
        @Override
        public void unsubscribe(URL url, NotifyListener listener) {
            if (url == null) {
                throw new IllegalArgumentException("unsubscribe url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("unsubscribe listener == null");
            }
            if (logger.isInfoEnabled()) {
                logger.info("Unsubscribe: " + url);
            }
            Set<NotifyListener> listeners = subscribed.get(url);
            if (listeners != null) {
                listeners.remove(listener);
            }
        }
    
    

    服务的恢复

    注册的恢复包括注册服务的恢复和订阅服务的恢复,因为在内存中表留了注册的服务和订阅的服务,因此在恢复的时候会重新拉取这些数据,分别调用发布和订阅的方法来重新将其录入到注册中心中。

    protected void recover() throws Exception {
            // register
            //把内存缓存中的registered取出来遍历进行注册
            Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
            if (!recoverRegistered.isEmpty()) {
                if (logger.isInfoEnabled()) {
                    logger.info("Recover register url " + recoverRegistered);
                }
                for (URL url : recoverRegistered) {
                    register(url);
                }
            }
            // subscribe
            //把内存缓存中的subscribed取出来遍历进行订阅
            Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
            if (!recoverSubscribed.isEmpty()) {
                if (logger.isInfoEnabled()) {
                    logger.info("Recover subscribe url " + recoverSubscribed.keySet());
                }
                for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                    URL url = entry.getKey();
                    for (NotifyListener listener : entry.getValue()) {
                        subscribe(url, listener);
                    }
                }
            }
        }
    

    通知

    protected void notify(List<URL> urls) {
            if (urls == null || urls.isEmpty()) return;
            // 遍历订阅URL的监听器集合,通知他们
            for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
                URL url = entry.getKey();
    
                // 匹配
                if (!UrlUtils.isMatch(url, urls.get(0))) {
                    continue;
                }
                // 遍历监听器集合,通知他们
                Set<NotifyListener> listeners = entry.getValue();
                if (listeners != null) {
                    for (NotifyListener listener : listeners) {
                        try {
                            notify(url, listener, filterEmpty(url, urls));
                        } catch (Throwable t) {
                            logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
        }
    
        /**
         * 通知监听器,URL 变化结果
         * @param url
         * @param listener
         * @param urls
         */
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((urls == null || urls.isEmpty())
                    && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            Map<String, List<URL>> result = new HashMap<String, List<URL>>();
            // 将urls进行分类
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    // 按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类
                    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                    List<URL> categoryList = result.get(category);
                    if (categoryList == null) {
                        categoryList = new ArrayList<URL>();
                        // 分类结果放入result
                        result.put(category, categoryList);
                    }
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            // 获得某一个消费者被通知的url集合(通知的 URL 变化结果)
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified == null) {
                // 添加该消费者对应的url
                notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
                categoryNotified = notified.get(url);
            }
            // 处理通知监听器URL 变化结果
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                // 把分类标实和分类后的列表放入notified的value中
                // 覆盖到 `notified`
                // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
                categoryNotified.put(category, categoryList);
                // 保存到文件
                saveProperties(url);
                //通知监听器
                listener.notify(categoryList);
            }
        }
    
    

    在构造函数的最后一句,调用notify(url.getBackupUrls()); 来将注册中心url返回的urls来进行通知。从下面代码可以开出返回的urls是通过url的参数获得的。

    public List<URL> getBackupUrls() {
            List<URL> urls = new ArrayList<URL>();
            urls.add(this);
            String[] backups = getParameter(Constants.BACKUP_KEY, new String[0]);
            if (backups != null && backups.length > 0) {
                for (String backup : backups) {
                    urls.add(this.setAddress(backup));
                }
            }
            return urls;
        }
    

    然后获取遍历所有订阅URL,类型Map<URL,Set<NotifyListener>> ,判断遍历中的当前url与传入的backupURL是否匹配,匹配了继续向下执行,否则则跳过这个url,再处理下一个url。当向下执行时,获取遍历当前url的监听器。对每个监听器执行notify(url, listener, filterEmpty(url, urls))

      protected static List<URL> filterEmpty(URL url, List<URL> urls) {
            if (urls == null || urls.isEmpty()) {
                List<URL> result = new ArrayList<URL>(1);
                result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
                return result;
            }
            return urls;
        }
    

    如果urls为空,则将根据url的信息新建一个url,并设置协议为空协议,放入到urls中。
    然后执行notify方法,将backupURLS进行分类,放入到result中。
    在上述中遍历所有订阅的urls,然后在每个url中再执行nofity,所以接下来的步骤可以理解成遍历订阅的urls,在循环内部获取每个url的被通知的urls集合。
    每个url获取一个被通知的urls集合,categoryNotified
    之后遍历backURLs,它会覆盖掉原来被通知的集合categoryNotified
    遍历结束后,会将结果保存到文件中,
    最后通知监听器处理,最后的这个通知方法在之后的篇章解释。

    相关文章

      网友评论

        本文标题:Dubbo-服务注册中心之AbstractRegistry

        本文链接:https://www.haomeiwen.com/subject/eqhwrctx.html