美文网首页
Dubbo源码解析之订阅更新

Dubbo源码解析之订阅更新

作者: captain_fu | 来源:发表于2019-12-02 17:25 被阅读0次

    从今天开始,会不定期更新dubbo源码相关文章。
    今天所要描述的场景是当注册中心provider数据发生变更时,consumer端如何感知并同步更新。阅读以下文章需要对dubbo的基本实现机制有一定了解。

    image.png

    先引用官方的dubbo架构图,接下来所要描述的场景就是图中的第三步,notify。

    当前团队使用订阅机制是轮询机制,每隔一个周期去请求注册中心,对比注册中心的版本号和本地版本号,当版本号发生变化时,更新本地订阅信息。

    dubbo订阅更新.png

    大致流程图如上所示。下面开始看源码。

    //org.apache.dubbo.registry.support.AbstractRegistry#notify()
        /**
         * Notify changes from the Provider side.
         *
         * @param url      consumer side url
         * @param listener listener
         * @param urls     provider latest urls
         */
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        //省略部分代码
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                listener.notify(categoryList);//更新本地的订阅信息
                // We will update our cache file after each notification.
                // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
                saveProperties(url);//保存最新provider信息到本地文件
            }
        }
    

    上述方法是实现自定义注册中心的入口。也是更新本地订阅信息的入口。
    参数介绍:

    1. url是consumer端url。
    2. listener是实现更新的接口
    3. urls是provider端url。

    接着看listener.notify的实现。

    public interface NotifyListener {
    
        /**
         * Triggered when a service change notification is received.
         * <p>
         * Notify needs to support the contract: <br>
         * 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br>
         * 2. The first notification at a subscription must be a full notification of all types of data of a service.<br>
         * 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
         * 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
         * 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br>
         *
         * @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
         */
        void notify(List<URL> urls);
    
    }
    

    大致译文如下:
    当服务发生变化时,触发该方法。
    Notify实现需要支持以下契约:

    1. 使用服务接口和数据类型规格来通知。就是说,不要仅仅通知一个服务的部分数据(需要完整数据)。用户不需要将当前通知与前置通知做比较。
    2. 订阅的第一个通知必须是一个服务所有数据类型的全量通知。
    3. 变化发生时,不同数据类型允许分开通知,比如provider,consumers,routers,overides。允许只有一个类型被通知,但是数据类型必须是完整的,不能是增量的。
    4. 如果数据类型为空,需要返回有category参数的空协议(这一点很重要,如果不显示返回空协议,本地的配置不会被覆盖)。
    5. 通知的顺序必须有保证(registry的实现)。比如单线程推送、队列串行化、版本比较。
    //org.apache.dubbo.registry.integration.RegistryDirectory#notify
    
    @Override
        public synchronized void notify(List<URL> urls) {
            //省略部分代码
            List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
            List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
            refreshOverrideAndInvoker(providerURLs);
        }
    

    org.apache.dubbo.registry.integration.RegistryDirectory#notify是NotifyListener.notify的一个实现。包含以下几步:

    1. 获取url中的configurators信息,configurators是外部化配置信息,包含服务者动态配置 URL 元数据信息。
    2. 获取url中的routers信息, routers是路由配置信息,包含消费者路由策略 URL 元数据信息。
    3. 获取url中的providers信息,providers是服务提供者注册信息,包含多个服务者 URL 元数据信息。

    接下来看下refreshOverrideAndInvoker的实现。

    private void refreshOverrideAndInvoker(List<URL> urls) {
            // mock zookeeper://xxx?mock=return null
            overrideDirectoryUrl();//做配置信息的覆盖合并
            refreshInvoker(urls);//刷新invokers信息。
        }
    

    配置信息的覆盖比较简单,接下里看如何刷新invokers。

    private void refreshInvoker(List<URL> invokerUrls) {
      //省略部分代码
      Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// url转换为 Invoker map
      //省略部分代码
      try {
        destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 销毁旧的Invoker,启动新的Invoker
      } catch (Exception e) {
        logger.warn("destroyUnusedInvokers error. ", e);
      }
    }
    

    刷新invokers分为两步。

    1. url转换为invokers实例。
    2. 销毁旧的Invoker,启动新的Invoker。

    接下来看下url转换的实现:

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
      //省略部分代码
      URL url = mergeUrl(providerUrl);// 合并url参数,包括是否启用等配置信息。
      //合并的顺序是override > -D >Consumer > Provider
    
      for (URL providerUrl : urls) {
        // key是url全称
                Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // 本地的invoker实例
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
                if (invoker == null) { //本地查找invoker不存在
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(DISABLED_KEY)) { //先判断disabled key是否存在
                            enabled = !url.getParameter(DISABLED_KEY, false);
                        } else { //再判断enabled key是否存在
                            enabled = url.getParameter(ENABLED_KEY, true);
                        }
                        if (enabled) { //如果enabled=true,创建新的invoker实例
                            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // 新的invoker实例放入map中
                        newUrlInvokerMap.put(key, invoker);
                    }
                } else { //invoker在本地存在,不需要重新构造,直接放入map中。
                    newUrlInvokerMap.put(key, invoker);
                }
      }
    
    return newUrlInvokerMap;
    }
    

    url转换Invoker完成后,再看下如果启动新的Invokers。

    private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
      //省略部分代码
      // 找出oldUrlInvokerMap中有,newUrlInvokerMap中没有的实例
            List<String> deleted = null;
            if (oldUrlInvokerMap != null) {
                Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
                for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
                    if (!newInvokers.contains(entry.getValue())) {
                        if (deleted == null) {
                            deleted = new ArrayList<>();
                        }
                        deleted.add(entry.getKey());
                    }
                }
            }
    
      //调用detroy方法销毁实例
            if (deleted != null) {
                for (String url : deleted) {
                    if (url != null) {
                        Invoker<T> invoker = oldUrlInvokerMap.remove(url);
                        if (invoker != null) {
                            try {
                                invoker.destroy();
                                if (logger.isDebugEnabled()) {
                                    logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");
                                }
                            } catch (Exception e) {
                                logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);
                            }
                        }
                    }
                }
            }
    }
    

    OK,整个订阅更新的过程已经走完。

    相关文章

      网友评论

          本文标题:Dubbo源码解析之订阅更新

          本文链接:https://www.haomeiwen.com/subject/cjjfgctx.html