美文网首页
dubbo技术内幕四 Directory + Router

dubbo技术内幕四 Directory + Router

作者: 牧羊人刘俏 | 来源:发表于2021-04-14 09:10 被阅读0次

在上一篇有介绍,ReferenceBean refer的源码再贴一下

Class  RegistryProtocol
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //通过type也就是dubbo service的接口类型,和url zk的地址构造一个目录服务
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        //设置其registry,默认是个ZookeeperRegistry
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
           //ZookeeperRegistry对指定的zk目录进行监听,主要是监听 配置 路由 和消费者url变更的信息
            registry.register(registeredConsumerUrl);
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        //目录服务对指定的zk地址进行订阅和监听,调用之后,directory内部的所有信息
        //都会刷新一遍
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
       //通过cluster将directory的集群调用封装成一个Invoker
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

可以看到如上的源码里面,基本都是针对new出来的directory进行操作。
我们先看下directory的类的依赖关系图


image.png

如上图,RegistryDirectory不仅是个目录服务,而且实现了NotifyListener,所以当zk的某些目录发生变更的时候,RegistryDirectory会实时的刷新其内部的路由缓存信息,保证了其路由信息的实时的更新。
我们从Directory --->RegistryDirectory 进行分析

Class  Directory
public interface Directory<T> extends Node {

    /**
     * get service type.  返回支持的service类型也就是我们的dubbo的接口类型,
     *等于说每个dubbo服务都会new一个Directory出来
     *
     * @return service type.
     */
    Class<T> getInterface();

    /**
     * list invokers.  根据invocation返回所有的Invokers,为什么会有这个方法,因为
     *dubbo支持tag和group等标签,所以不同的invocation(可以认为是dubbo服务的某个 
     *方法)返回不同的invokers
     * @return invokers
     */
    List<Invoker<T>> list(Invocation invocation) throws RpcException;

}

Directory的中间抽象类AbstractDirectory
里面有几个属性

private final URL url; //可以认为是当前的目录服务的url的地址,一般来说是对zk地址的封装
private volatile List<Router> routers;//路由规则,如果我们配置了路由规则,这些路由规则会对Invokers进行过滤,只有满足条件的Invokers才会返回

在其构造函数里面,如下

 public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
        if (url == null)
            throw new IllegalArgumentException("url == null");
        this.url = url;
        this.consumerUrl = consumerUrl;
        setRouters(routers);
    }

其中setRouters(routers)会自动的加入两个路由规则,如下

 protected void setRouters(List<Router> routers) {
        // copy list
        routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
        // append url router
        String routerkey = url.getParameter(Constants.ROUTER_KEY);
        if (routerkey != null && routerkey.length() > 0) {
            RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
            routers.add(routerFactory.getRouter(url));
        }
        // append mock invoker selector
        //mock路由
        routers.add(new MockInvokersSelector());
       //标签路由
        routers.add(new TagRouter());
        Collections.sort(routers);
        this.routers = routers;
    }

可以认为这些routers就是invokers的过滤器,根据url的规则过滤出满足条件的invokers返回,routers我们在后面进行分析。
我们在源码里面可以窥探一二,如下

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        //如何根据invocation返回invokers留给子类实现
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                       //根据router对invokers进行过滤,不满足条件的invokers过滤掉
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }

而其最重要的doList方法再RegistryDirectory方法里面进行了实现,如下

Class  RegistryDirectory
public List<Invoker<T>> doList(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        //拿到本地缓存的method与List<Invoker<T>的缓存map,根据invocation的信息进行
       //查找
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

上面的代码不难理解,最重要的就是localMethodInvokerMap,直接的在这个里面拿信息就可以了,但是这里面的信息是怎么来的,以及是怎么变更的呢。
由于RegistryDirectory实现了NotifyListener,不难想到localMethodInvokerMap的维护和变更是在 void notify(List<URL> urls)里面实现的。源码如下
我们先看下NotifyListener的源码备注

public interface NotifyListener {

    /**
     * Triggered when a service change notification is received.
     * <p>
     * Notify needs to support the contract: <br>
     * 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br> 每次必须是针对某个 type data 的全量通知
     * 2. The first notification at a subscription must be a full notification of all types of data of a service.<br>  //第一次必须是all types of data的通知(providers, consumers, routers, overrides.)
     * 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
      //后面可以是针对某个type的全量通知
     * 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
     * 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br>  //顺序要保证,dubbo里面使用synchronized来做的同步
     *
     * @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}. //不能通知个null结果
     */
    void notify(List<URL> urls);

}

如上对notify里面的参数urls做了规定。

Class  RegistryDirectory
public synchronized void notify(List<URL> urls) {
        //缓存所有providers的url
        List<URL> invokerUrls = new ArrayList<URL>();
       //缓存所有router的url
        List<URL> routerUrls = new ArrayList<URL>();
        //缓存所有的配置 configurator的url
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
             //判断url的protocol,根据protocol判断通知的type的类型
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            //如果是route变更
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            //如果是配置变更
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
           //如果是providers url的变更
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // configurators  处理变更的configuratorUrls
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers 处理变更的routerUrls
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers 刷新
        refreshInvoker(invokerUrls);
    }

可以看到其实上面就是针对providers、 router、configurator配置的变更做了监听,然后实时的更新本地methodInvokerMap。
我们重点的看下refreshInvoker(invokerUrls),源码主要就是如下的三句

//将invokerUrls转化成Invokesr
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
//根据方法拆分缓存
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
//将原来老的不要的Invoker销毁掉
 destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker

而在 toInvokers(invokerUrls)方法中,最重要的就一句,如下

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

而通过debug可以发现
protocol.refer(serviceType, url)被封装成了一个DubboInvoker<T>,并继续的代理成一个InvokerDelegate,关于protocol的作用我们后面进行分析。
而toMethodInvokers(newUrlInvokerMap);就是在newUrlInvokerMap的基础上根据method做进一步的缓存。有兴趣可以看下
最后的destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
将oldUrlInvokerMap里面不在newUrlInvokerMap里面的invoke进行销毁。

经过一次监听变更,newMethodInvokerMap达到了最新。

既然在Directory使用Router进行了invokes的过滤,我们接着分析下Router的原理,先看下Router的类继承结构


image.png

整个Router的话继承结构还是很扁平化的。

public interface Router extends Comparable<Router>{

    /**
     * get the router url.
     *
     * @return url
     */
    URL getUrl();

    /**
     * route.
     *
     * @param invokers
     * @param url        refer url
     * @param invocation
     * @return routed invokers
     * @throws RpcException
     */
    <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

    /**
     * Router's priority, used to sort routers.
     *
     * @return router's priority
     */
    int getPriority();

}

三个方法也很简单,不难,我们选取TagRouter做代码跟踪,

Class TagRouter
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        // filter
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        // Dynamic param 判断Attachment里面是否有dubbo.tag属性
        String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);
        // Tag request
        if (!StringUtils.isEmpty(tag)) {
            // Select tag invokers first
            for (Invoker<T> invoker : invokers) {
                //根据invoker的url是否有tag标签并匹配上,如果匹配上了,加入
                if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                    result.add(invoker);
                }
            }
        }
        //如果过滤后没有满足条件的invokers
        //且Attachment没有dubbo.force.tag,那么将所有没有dubbo.tag标签的返回
        if (result.isEmpty()) {
            // Only forceTag = true force match, otherwise downgrade
            String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);
            if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
                for (Invoker<T> invoker : invokers) {
                    if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                        result.add(invoker);
                    }
                }
            }
        }
        return result;
    }

当然我们也可以自动以Router,根据实际条件做过滤
通过上面的分析我们可知道Directory可以对zk上的配置信息进行监听,并进行路由信息的动态更新,但是我们通过cluster做进一步的封装后才返回,如下
Invoker invoker = cluster.join(directory);
而在cluster的内部也封装了LoadBalance,为什么要这样设计呢。
因为在rpc调用的过程中,如何选择一个invoke进行调用呢,我们可以通过LoadBalance算法来选择合适的invoke进行调用,如果invoke调用失败了,我们可以重试,也可以快速报错也可以将异常吃掉,那封装这些异常的场景的就是cluster的工作了,下一章会对cluster和LoadBalance进行分析,我们可以看下,在集群情况下,dubbo是如何选择一个invoke进行调用的,以及针对异常的场景如何的应付。

相关文章

网友评论

      本文标题:dubbo技术内幕四 Directory + Router

      本文链接:https://www.haomeiwen.com/subject/nwvclltx.html