美文网首页Dubbo源码解析
Dubbo之Directory源码分析

Dubbo之Directory源码分析

作者: 土豆肉丝盖浇饭 | 来源:发表于2018-06-27 14:33 被阅读0次

    Directory的作用

    首先看下Directory的接口定义

    public interface Directory<T> extends Node {
        Class<T> getInterface();
    
        List<Invoker<T>> list(Invocation invocation) throws RpcException;
    }
    

    每个Directory实例会对应一个接口服务,它的主要功能是为Cluster提供远程对等调用invoker目录服务,list方法用于获取远程服务提供者的对等调用Invokers

    public interface Cluster {
        <T> Invoker<T> join(Directory<T> directory) throws RpcException;
    }
    

    Directory用于获取多个远程对等调用invoker,而Cluster用于将这些invoker伪装成一个invoker进行集群调用,Cluster源码会单独讲解

    Directory的两种实现

    Directory有两种实现,StaticDirectoryRegistryDirectory,分别对应静态和动态的Invoker目录服务
    这两个实现都继承了模板类AbstractDirectory,让我们来看下AbstractDirectory封装了什么逻辑
    可以在AbstractDirectory中可以看到这么一个变量

     private volatile List<Router> routers;
    

    routers用于路由,用于过滤远程对等调用invoker,关于Router路由源码我会单独讲解

    可以看到AbstractDirectory实现了Directory接口的list方法

    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
            List<Invoker<T>> invokers = doList(invocation);
            List<Router> localRouters = this.routers; // local reference
            if (localRouters != null && !localRouters.isEmpty()) {
                for (Router router : localRouters) {
                    try {
                        if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                            invokers = router.route(invokers, getConsumerUrl(), invocation);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                    }
                }
            }
            return invokers;
        }
    

    上面逻辑留了模板方法doList给子类实现

    protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
    

    子类实现doList方法,只负责invokers的获取,在AbstractDirectory中增加的router的过滤逻辑。消费者拿到的invoker集合,是经过routers过滤的。

    对于StaticDirectory和RegistryDirectory,我们只要关注如何获取远程对等调用invokers的逻辑即可

    StaticDirectory

    StaticDirectory没什么好讲的,doList方法直接返回设置的invokers

    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    
            return invokers;
        }
    

    RegistryDirectory

    RegistryDirectory实现了NotifyListener接口,会随着提供者的上下线动态刷新本地invoker缓存

    对于invoker,在RegistryDirectory有两个缓存

    private volatile Map<String, Invoker<T>> urlInvokerMap; 
    
    private volatile Map<String, List<Invoker<T>>> methodInvokerMap;
    

    urlInvokerMap缓存url对应的invoker
    methodInvokerMap缓存方法对应的invokers,方法对应invoker可以存在多个,所以是 List<Invoker<T>>,在dolist方法中会用到
    notify回调会刷新这两个缓存

    我们先来看下RegistryDirectory的订阅操作

    public void subscribe(URL url) {
            setConsumerUrl(url);
            registry.subscribe(url, this);
        }
    

    这边的registry可以认为就是之前讲的ZookeeperRegistry,把RegistryDirectory自身作为订阅回调,一旦监控的路径发生变化,就会回调RegistryDirectory的notify方法

    那么subscribe会订阅那些url?在RegistryProtocol中可以看到

    //directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    

    会监听这个接口的provider,configurators,routers目录

    接下来看notify的实现

    public synchronized void notify(List<URL> urls) {
            //这里的通知会一次性传递对应监听目录下所有的url
            List<URL> invokerUrls = new ArrayList<URL>();
            List<URL> routerUrls = new ArrayList<URL>();
            List<URL> configuratorUrls = new ArrayList<URL>();
            //对url进行分类
            for (URL url : urls) {
                String protocol = url.getProtocol();
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.ROUTERS_CATEGORY.equals(category)
                        || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                    routerUrls.add(url);
                } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                        || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                    configuratorUrls.add(url);
                } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                    invokerUrls.add(url);
                } else {
                    logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
                }
            }
            // configurators
            //刷新configuratorUrls
            if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
                this.configurators = toConfigurators(configuratorUrls);
            }
            // routers
            //刷新routers
            if (routerUrls != null && !routerUrls.isEmpty()) {
                List<Router> routers = toRouters(routerUrls);
                if (routers != null) { // null - do nothing
                    setRouters(routers);
                }
            }
            List<Configurator> localConfigurators = this.configurators; // local reference
            // merge override parameters
            this.overrideDirectoryUrl = directoryUrl;
            //override这个overrideDirectoryUrl什么用?
            if (localConfigurators != null && !localConfigurators.isEmpty()) {
                for (Configurator configurator : localConfigurators) {
                    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
                }
            }
            // providers
            //刷新客户端对等invoker
            refreshInvoker(invokerUrls);
        }
    

    方法的开始,会对url进行分类,一共provider,configurators,routers三种类型
    先刷新RegistryDirectory中的configurators,routers,再使用provider urls增量刷新invoker缓存
    如果provider urls不存在,那么根据上一次的缓存provider urls,再使用router增量刷新invoker缓存

    private void refreshInvoker(List<URL> invokerUrls) {
            if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                    && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                //传入的url protocol = empty,directory设置为禁用
                this.forbidden = true; // Forbid to access
                this.methodInvokerMap = null; // Set the method invoker map to null
                //摧毁客户端的对等调用invoker
                destroyAllInvokers(); // Close all invokers
            } else {
                this.forbidden = false; // Allow to access
                Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                //invokerUrls为空,因为通知的url可能只改变了router或者configurator,提供者并没有变化,但是对应invoker配置还是需要被更改的
                if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                    //invokerUrls为空使用缓存的invokers urls,也就是上一次回调拿到invokers
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    //更新缓存
                    this.cachedInvokerUrls = new HashSet<URL>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
                }
                //invokerUrls为空,中止
                if (invokerUrls.isEmpty()) {
                    return;
                }
                //把url转换为invoker,已经存在的invoker不会重新创建
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
                //把newUrlInvokerMap转换为methodInvokerMap
                Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
                // state change
                // If the calculation is wrong, it is not processed.
                if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                    return;
                }
                //如果存在group配置,对method对应的invoker进行cluster伪装
                this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
                this.urlInvokerMap = newUrlInvokerMap;
                try {
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }
    

    在refreshInvoker方法中,会根据消费者url的protocol过滤掉不匹配的提供者url,然后对过滤后的提供者url生成远程对等调用invoker,如果invoker已经存在,那么不用再重复创建

    接下来看下doList方法的实现

    public List<Invoker<T>> doList(Invocation invocation) {
            if (forbidden) {
                // 1. No service provider 2. Service providers are disabled
                throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                    "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                            + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
            }
            List<Invoker<T>> invokers = null;
            //针对每个方法,有不同的invokers列表。可能存在路由配置
            Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
            if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
                //从invocaion获取方法名和参数,可能是$invoke泛化调用
                String methodName = RpcUtils.getMethodName(invocation);
                Object[] args = RpcUtils.getArguments(invocation);
                if (args != null && args.length > 0 && args[0] != null
                        && (args[0] instanceof String || args[0].getClass().isEnum())) {
                    invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
                }
                if (invokers == null) {
                    invokers = localMethodInvokerMap.get(methodName);
                }
                if (invokers == null) {
                    invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
                }
                if (invokers == null) {
                    Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                    if (iterator.hasNext()) {
                        invokers = iterator.next();
                    }
                }
            }
            return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
        }
    

    直接从methodInvokerMap中获取对应的invoker集合即可。优先通过方法名查找,如果找不到,通过*作为key查找,再找不到,返回methodInvokerMap第一个invoker集合。

    讲解完子类需要实现的doList方法后,下面看下RegistryDirectory是如何被使用到的

    RegistryDirectory的使用

    RegistryDirectory封装了获取远程对等invokers的逻辑,主要使用在RegistryProtocol的doRefer方法

     private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            //这边的url为consumer url
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            //这里的protocol为spi注入的适配类
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                //注册consumer url
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            //directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            //通过cluster封装获取invoker的逻辑,将对多个invoker的集群调用封装成一个invoker
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    

    RegistryDirectory通过与Cluster配合,将对多个invoker的集群调用封装成一个invoker,然后通过代理把invoker转换为代理对象bean,放入spring容器中去,就和正常使用本地bean一样。这就是RPC。

    总结

    Dubbo的目录服务,用于获取远程对等invoker,其实这种设计在业务场景中也能用到。
    比如我们公司项目的司机池功能,对于每个订单都有一个司机池,并且这个司机池会随着司机状态变化而发生变化,也可以参考Directory的接口设计。

    最后

    希望大家关注下我的公众号


    image

    相关文章

      网友评论

        本文标题:Dubbo之Directory源码分析

        本文链接:https://www.haomeiwen.com/subject/igrvsftx.html