美文网首页
Dubbo消费者订阅

Dubbo消费者订阅

作者: 你值得拥有更好的12138 | 来源:发表于2019-01-04 22:07 被阅读0次

    一、与Spring的结合

    ReferenceConfig被ReferenceBean继承然后通过ReferenceAnnotationBeanPostProcessor注册到Spring中IOC中。与ServiceConifg注册过程类似

    二、订阅服务

    首先入口在ReferenceConfig的get方法中方法触发init方法

      public synchronized T get() {
            if (destroyed) {
                throw new IllegalStateException("Already destroyed!");
            }
            if (ref == null) {
                init();
            }
            return ref;
        }
    

    然后来看看init最后获取消费者代理的代码

      ref = createProxy(map);
            ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), ref, interfaceClass.getMethods());
            ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    

    ref就是这消费的代理,调用时通过这个代理去调用invoker然后通过dubbo协议到最底层通过TCP远程调用服务,获取结果。

    看看createProxy怎么创建代理的

    if (urls.size() == 1) {
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
                }
    

    通过protocol的refer去获取invoker。这的protocol通过扩展工具获得的RegistryProtocol,看看他的refer方法做了什么。一直走到方法的最底层doRefer方法。

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    

    这里通过registry订阅服务,这个registry根据配置的的注册中心不一样,扩展工具获取的也不一样。如果你配置的zookeeper,那么获取的结果就是ZookeeperRegistry。
    这里的注册逻辑与Service的注册逻辑差不多。

    Invoker是通过RegistryDirectory获取的,Directoy是Invoker的集合包装,分为静态和动态的。静态的就直接构造了,动态的是通过监听器,监听注册中心服务的变化进行动态的更新invokers。看看实现:

    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    }
    

    NotifyListener监听器就是监听注册中心的变化,看看实现的监听逻辑在notify这个方法中的refreshInvoker中通过传入的url进行invoker的构造。重要逻辑在toInvoker方法中

    Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
                if (invoker == null) { // Not in the cache, refer again
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(Constants.DISABLED_KEY)) {
                            enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                        } else {
                            enabled = url.getParameter(Constants.ENABLED_KEY, true);
                        }
                        if (enabled) {
                            invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(key, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(key, invoker);
                }
    

    最后发现这发现这里通过protocol的refer进行invoker的构造。咦,TM的这不是又回去了吗?第一次看我真是这么想的,这TM的不是递归了吗?到处找递归结束条件,最终无果!
    最后发现这个protocol是在前面的RegistryProtocol传入的,这实例中有个成员变量也是Protocol类型的,这个Protocol又是通过扩展工具在构造RegistryProtocol的时候注入的。

    public class RegistryProtocol implements Protocol {
    
        private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class);
        private static RegistryProtocol INSTANCE;
        private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<URL, NotifyListener>();
        //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
        //providerurl <--> exporter
        private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
        private Cluster cluster;
        private Protocol protocol;
        private RegistryFactory registryFactory;
        private ProxyFactory proxyFactory;
    }
    
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
    

    然后突然想着TM这个protocol通过ExtensionLoader工具注入的时候会不是就是他自己,那这么想又TM的递归了!最后看懂了ExtensionLoader工具后发现他是URL总线模式的,这个时候在url中的参数已经变了,不会获取原来那个Protocol,而是DubboProtocol.
    URL变化代码如下,如果想搞清楚就一定要看懂ExtensionLoader

     url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
            Registry registry = registryFactory.getRegistry(url);
    

    这段代码在RegistryProtocol中的refer方法中。

    三、构造Invoker

    DubboProtocol中构造Inoker代码,所以从这里我们可以证明Dubbo注册,订阅是通过如zookeeper这样的中间件,但是真正的通讯是通过dubbo协议自定义通讯来进行沟通的。

     @Override
        public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
    

    相关文章

      网友评论

          本文标题:Dubbo消费者订阅

          本文链接:https://www.haomeiwen.com/subject/xnbtxqtx.html