美文网首页dubbo
Dubbo Consumer 直连和注册中心服务引用流程

Dubbo Consumer 直连和注册中心服务引用流程

作者: 晴天哥_王志 | 来源:发表于2019-10-27 23:26 被阅读0次

    开篇

     这篇文章的目的在于描述Dubbo Consumer在直连和注册中心两种场景下针对provider侧invoker的封装。整篇文章主要从单注册中心、单直连地址、多注册中心、多直连地址的角度进行分析。

     通过这篇文章能够了解到Consumer侧针对invoker的生成流程,通过invoker的生成可以了解invoker的调用链。

    Consumer reference流程

    public class ReferenceConfig<T> extends AbstractReferenceConfig {
    
        private static final Protocol REF_PROTOCOL = 
            ExtensionLoader.getExtensionLoader(Protocol.class)
           .getAdaptiveExtension();
    
        private T createProxy(Map<String, String> map) {
            if (shouldJvmRefer(map)) {
                // 省略无关代码
            } else {
                urls.clear(); 
                // 处理reference配置直连情况
                if (url != null && url.length() > 0) { 
                    // 处理逗号分隔的直连地址
                    String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (StringUtils.isEmpty(url.getPath())) {
                                url = url.setPath(interfaceName);
                            }
                            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                // 添加直连地址到urls
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else {
                    // 处理注册中心的情况
                    if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
                        checkRegistry();
                        // 获取注册中心
                        List<URL> us = loadRegistries(false);
                        // 添加所有注册中心到urls
                        if (CollectionUtils.isNotEmpty(us)) {
                            for (URL u : us) {
                                URL monitorUrl = loadMonitor(u);
                                if (monitorUrl != null) {
                                    map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                                }
                                urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            }
                        }
                        if (urls.isEmpty()) {
                        }
                    }
                }
    
                // 处理单注册中心或者只有单个直连的情况
                if (urls.size() == 1) {
                    // 单注册中心协议为"registry",直连场景下协议为"dubbo"
                    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
                } else { // 处理多注册中心或者只有多个直连的情况
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    
                    URL registryURL = null;
                    for (URL url : urls) {
                        // 单注册中心协议为"registry",直连场景下协议为"dubbo"
                        invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
    
                    if (registryURL != null) {
                        // 针对多注册中心的方式,通过RegistryAwareCluster进行封装
                        URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                        // RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                        invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                    } else { // 处理多直连方式
                        invoker = CLUSTER.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            if (shouldCheck() && !invoker.isAvailable()) {
                // 处理invoker可用检查的逻辑
                throw new IllegalStateException("Failed to check the status of the service " 
                + interfaceName + ". No provider available for the service " 
                + (group == null ? "" : group + "/") + interfaceName 
                + (version == null ? "" : ":" + version) + " from the url "  
                + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() 
                + " use dubbo version " + Version.getVersion());
            }
    
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
                metadataReportService.publishConsumer(consumerURL);
            }
    
            // create service proxy
            return (T) PROXY_FACTORY.getProxy(invoker);
        }
    }
    
    • 区分单注册中心、单直连地址、多注册中心、多直连地址的四种情况生成invoker。
    • "urls.size() == 1"的条件用于处理单注册中心和单直连地址两种情况。
    • "urls.size() != 1"的条件用于处理多注册中心和多直连地址两种情况。

    Protocol适配器

    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    
        public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            
            org.apache.dubbo.common.URL url = arg1;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    
            if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            // 获取扩展
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            // 执行扩展的refer方法
            return extension.refer(arg0, arg1);
        }
    }
    
    com.alibaba.dubbo.rpc.Protocol文件
    
    filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
    listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
    mock=com.alibaba.dubbo.rpc.support.MockProtocol
    dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
    injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
    rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
    hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
    com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
    com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
    thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
    memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
    redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
    rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
    registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
    qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper
    
    • Protocol$Adaptive根据url.getProtocol()的Protocol协议生成对应的Protocol对象。
    • Protocol文件内容如上图所示。

    Cluster适配器

    public class Cluster$Adaptive implements Cluster {
    
        public Invoker join(Directory directory) throws RpcException {
            
            if (directory == null) {
                throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null");
            }
            
            if (directory.getUrl() == null) {
                throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument getUrl() == null");
            }
            
            URL uRL = directory.getUrl();
            String string = uRL.getParameter("cluster", "failover");
            
            if (string == null) {
                throw new IllegalStateException(new StringBuffer()
                .append("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (")
                .append(uRL.toString()).append(") use keys([cluster])").toString());
            }
            
            Cluster cluster = (Cluster)ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(string);
    
            return cluster.join(directory);
        }
    }
    
    com.alibaba.dubbo.rpc.cluster.Cluster文件
    
    mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
    failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
    failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
    failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
    failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
    forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
    available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
    mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
    broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
    registryaware=org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster
    
    • Cluster$Adaptive根据uRL.getParameter("cluster", "failover")的Cluster协议生成对应的Cluster对象。
    • Cluster文件内容如上图所示。

    单注册中心或直连地址场景

    处理单注册中心场景

    public class RegistryProtocol implements Protocol {
    
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            url = URLBuilder.from(url)
                    .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                    .removeParameter(REGISTRY_KEY)
                    .build();
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // group="a,b" or group="*"
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
            String group = qs.get(GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                    return doRefer(getMergeableCluster(), registry, type, url);
                }
            }
    
            return doRefer(cluster, registry, type, url);
        }
    
        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
                directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
                registry.register(directory.getRegisteredConsumerUrl());
            }
            directory.buildRouterChain(subscribeUrl);
            directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                    PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    
            // invoker为MockClusterInvoker=>FailoverClusterInvoker
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    
            return invoker;
        }
    }
    
    单注册中心invoker调用链
    • 单注册中心场景下,urls.get(0)的协议是”registry“,Protocol$Adaptive返回的被封装的Protocol对象为RegistryProtocol。

    • ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry")返回的extension为ProtocolListenerWrapper对象,ProtocolListenerWrapper的封装链为ProtocolListenerWrapper => ProtocolFilterWrapper => RegistryProtocol。

    • extension.refer()过程按照ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => RegistryProtocol.refer()流程调用。

    • ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => RegistryProtocol.refer()的调用链路针对"registry"作了特殊处理,直接走RegistryProtocol.refer()的方法。

    • RegistryProtocol.refer()执行cluster.join(directory)生成MockClusterInvoker对象,封装关系为MockClusterInvoker=>FailoverClusterInvoker。

    • "registry"协议返回的invoker对象封装关系为MockClusterInvoker => FailoverClusterInvoker。

    • MockClusterInvoker的invoke()方法会执行FailoverClusterInvoker的doInvoke()方法,进入Dubbo的Cluster集群调用策略。

    处理单直连地址场景

    public class DubboProtocol extends AbstractProtocol {
    
        public static final String NAME = "dubbo";
    
        public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
    
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
    
            return invoker;
        }
    }
    
    public abstract class AbstractProtocol implements Protocol {
    
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
        }
    
        protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
    }
    
    
    public class AsyncToSyncInvoker<T> implements Invoker<T> {
    
        private Invoker<T> invoker;
    
        public AsyncToSyncInvoker(Invoker<T> invoker) {
            this.invoker = invoker;
        }
    }
    
    单直连地址invoker调用链
    • 单直连地址场景下,urls.get(0)的协议为"dubbo",Protocol$Adaptive返回的被封装的Protocol对象为DubboProtocol。

    • ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo")返回的extension为ProtocolListenerWrapper对象,ProtocolListenerWrapper的封装链为ProtocolListenerWrapper => ProtocolFilterWrapper => DubboProtocol。

    • extension.refer()过程按照ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => DubboProtocol.refer()流程调用。

    • DubboProtocol.refer()执行AbstractProtocol.refer()方法生成AsyncToSyncInvoker对象, AsyncToSyncInvoker对象内部包含DubboInvoker对象。

    • ”dubbo“协议的invoker对象封装关系为ListenerInvokerWrapper => ProtocolFilterWrapper => AsyncToSyncInvoker。

    • AsyncToSyncInvoker的invoke()方法会执行DubboProtocol的invoke()方法。

    多注册中心或多直连地址场景

    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    
    URL registryURL = null;
    for (URL url : urls) {
        // 单注册中心协议为"registry",直连场景下协议为"dubbo"
        invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            registryURL = url; // use last registry url
        }
    }
    
    if (registryURL != null) {
        // 针对注册中心的方式,通过RegistryAwareCluster进行封装
        URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
        // RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
        invoker = CLUSTER.join(new StaticDirectory(u, invokers));
    } else { // 处理直连方式
        invoker = CLUSTER.join(new StaticDirectory(invokers));
    }
    
    • 多注册中心场景下invokers为MockClusterInvoker对象列表,CLUSTER.join()方法的Cluster为"failover"。
    • 多直连地址场景下invokers为ListenerInvokerWrapper对象列表,CLUSTER.join()方法的Cluster为"failover"。

    处理多注册中心场景

    • 多注册中心场景下每个注册中心对应MockClusterInvoker对象,在外层有一层MockClusterInvoker包装。
    • 多注册中心场景下通过MockClusterInvoker包装多个注册中心中每个注册中心对应的MockClusterInvoker。
    多注册中心invoker调用链

    处理多直接地址场景

    • 多直连地址场景下每个直连地址对应ListenerInvokerWrapper对象,在外层有一层MockClusterInvoker包装。
    • 多直连地址场景下通过MockClusterInvoker包装直连地址对应的ListenerInvokerWrapper对象列表。
    多直连地址invoker调用链

    总结

    • 在单直连地址场景下:invoker对象为ListenerInvokerWrapper。

    • 在多直连地址场景下:invoker对象为MockClusterInvoker,内部包含ListenerInvokerWrapper对象。

    • 在单注册中心场景下:invoker对象为MockClusterInvoker。

    • 在多注册中心场景下:invoker对象为MockClusterInvoker,MockClusterInvoker内部包含注册中心对应的MockClusterInvoker对象,相当于在多注册中心情况下,每个注册中心对应一个MockClusterInvoker对象,外部通过MockClusterInvoker进行二次封装。

    相关文章

      网友评论

        本文标题:Dubbo Consumer 直连和注册中心服务引用流程

        本文链接:https://www.haomeiwen.com/subject/itjuvctx.html