美文网首页中间件
Dubbo——服务引用流程

Dubbo——服务引用流程

作者: 小波同学 | 来源:发表于2021-06-07 00:15 被阅读0次

    前言

    本文接着深入分析服务引用的核心流程。

    Dubbo 支持两种方式引用远程的服务:

    • 1、服务直连的方式,仅适合在调试服务的时候使用。

    • 2、基于注册中心引用服务,这是生产环境中使用的服务引用方式。

    DubboBootstrap 入口

    在介绍服务发布的时候,介绍了 DubboBootstrap.start() 方法的核心流程,其中除了会调用 exportServices() 方法完成服务发布之外,还会调用 referServices() 方法完成服务引用。

    在 DubboBootstrap.referServices() 方法中,会从 ConfigManager 中获取所有 ReferenceConfig 列表,并根据 ReferenceConfig 获取对应的代理对象,入口逻辑如下:

    public class DubboBootstrap extends GenericEventListener {
    
        private final ConfigManager configManager;
    
        private ReferenceConfigCache cache;
    
        private final ExecutorRepository executorRepository = getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
    
        private volatile boolean referAsync;
        
        private List<CompletableFuture<Object>> asyncReferringFutures = new ArrayList<>();
    
        private void referServices() {
            if (cache == null) {
                // 初始ReferenceConfigCache
                cache = ReferenceConfigCache.getCache();
            }
            
            // 遍历ReferenceConfig列表
            configManager.getReferences().forEach(rc -> {
                
                ReferenceConfig referenceConfig = (ReferenceConfig) rc;
                referenceConfig.setBootstrap(this);
                // 检测ReferenceConfig是否已经初始化
                if (rc.shouldInit()) {
                     // 异步
                    if (referAsync) {
                        CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
                                executorRepository.getServiceExporterExecutor(),
                                () -> cache.get(rc)
                        );
                        asyncReferringFutures.add(future);
                    } else {
                        // 同步
                        cache.get(rc);
                    }
                }
            });
        }
    }
    

    Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。

    新建的 ReferenceConfig 对象会通过 DubboBootstrap.reference() 方法添加到 ConfigManager 中进行管理,如下所示:

    public class DubboBootstrap extends GenericEventListener {
    
        private final ConfigManager configManager;
    
        public DubboBootstrap reference(ReferenceConfig<?> referenceConfig) {
            configManager.addReference(referenceConfig);
            return this;
        }
    }
    

    ReferenceConfigCache

    服务引用的核心实现在 ReferenceConfig 之中,一个 ReferenceConfig 对象对应一个服务接口,每个 ReferenceConfig 对象中都封装了与注册中心的网络连接,以及与 Provider 的网络连接,这是一个非常重要的对象。

    为了避免底层连接泄漏造成性能问题,从 Dubbo 2.4.0 版本开始,Dubbo 提供了 ReferenceConfigCache 用于缓存 ReferenceConfig 实例。

    在 dubbo-demo-api-consumer 示例中,我们可以看到 ReferenceConfigCache 的基本使用方式:

    ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
    reference.setInterface(DemoService.class);
    ... 
    // 这一步在DubboBootstrap.start()方法中完成
    ReferenceConfigCache cache = ReferenceConfigCache.getCache();
    ...
    DemoService demoService = ReferenceConfigCache.getCache().get(reference);
    

    在 ReferenceConfigCache 中维护了一个静态的 Map(CACHE_HOLDER)字段,其中 Key 是由 Group、服务接口和 version 构成,Value 是一个 ReferenceConfigCache 对象。在 ReferenceConfigCache 中可以传入一个 KeyGenerator 用来修改缓存 Key 的生成逻辑,KeyGenerator 接口的定义如下:

    public class ReferenceConfigCache {
    
        public interface KeyGenerator {
            String generateKey(ReferenceConfigBase<?> referenceConfig);
        }
    }
    

    默认的 KeyGenerator 实现是 ReferenceConfigCache 中的匿名内部类,其对象由 DEFAULT_KEY_GENERATOR 这个静态字段引用,具体实现如下:

    public class ReferenceConfigCache {
    
        public static final String DEFAULT_NAME = "_DEFAULT_";
    
        public static final KeyGenerator DEFAULT_KEY_GENERATOR = referenceConfig -> {
            String iName = referenceConfig.getInterface();
            if (StringUtils.isBlank(iName)) {
                // 获取服务接口名称
                Class<?> clazz = referenceConfig.getInterfaceClass();
                iName = clazz.getName();
            }
            if (StringUtils.isBlank(iName)) {
                throw new IllegalArgumentException("No interface info in ReferenceConfig" + referenceConfig);
            }
            // Key的格式是group/interface:version
            StringBuilder ret = new StringBuilder();
            if (!StringUtils.isBlank(referenceConfig.getGroup())) {
                ret.append(referenceConfig.getGroup()).append("/");
            }
            ret.append(iName);
            if (!StringUtils.isBlank(referenceConfig.getVersion())) {
                ret.append(":").append(referenceConfig.getVersion());
            }
            return ret.toString();
        };
    }
    

    在 ReferenceConfigCache 实例对象中,会维护下面两个 Map 集合:

    • proxies(ConcurrentMap<Class<?>, ConcurrentMap<String, Object>>类型):该集合用来存储服务接口的全部代理对象,其中第一层 Key 是服务接口的类型,第二层 Key 是上面介绍的 KeyGenerator 为不同服务提供方生成的 Key,Value 是服务的代理对象。

    • referredReferences(ConcurrentMap<String, ReferenceConfigBase<?>> 类型):该集合用来存储已经被处理的 ReferenceConfig 对象。

    回到 DubboBootstrap.referServices() 方法中,看一下其中与 ReferenceConfigCache 相关的逻辑。

    首先是 ReferenceConfigCache.getCache() 这个静态方法,会在 CACHE_HOLDER 集合中添加一个 Key 为“DEFAULT”的 ReferenceConfigCache 对象(使用默认的 KeyGenerator 实现),它将作为默认的 ReferenceConfigCache 对象。

    接下来,无论是同步服务引用还是异步服务引用,都会调用 ReferenceConfigCache.get() 方法,创建并缓存代理对象。下面就是 ReferenceConfigCache.get() 方法的核心实现:

    public class ReferenceConfigCache {
    
        public <T> T get(ReferenceConfigBase<T> referenceConfig) {
            // 生成服务提供方对应的Key
            String key = generator.generateKey(referenceConfig);
            // 获取接口类型
            Class<?> type = referenceConfig.getInterfaceClass();
            // 获取该接口对应代理对象集合
            proxies.computeIfAbsent(type, _t -> new ConcurrentHashMap<>());
    
            ConcurrentMap<String, Object> proxiesOfType = proxies.get(type);
            
            // 根据Key获取服务提供方对应的代理对象
            proxiesOfType.computeIfAbsent(key, _k -> {
                // 服务引用
                Object proxy = referenceConfig.get();
                // 将ReferenceConfig记录到referredReferences集合
                referredReferences.put(key, referenceConfig);
                return proxy;
            });
    
            return (T) proxiesOfType.get(key);
        }
    }
    

    ReferenceConfig

    通过前面的介绍知道,ReferenceConfig 是服务引用的真正入口,其中会创建相关的代理对象。下面先来看 ReferenceConfig.get() 方法:

    public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    
        private transient volatile T ref;
        
        public synchronized T get() {
            // 检测当前ReferenceConfig状态
            if (destroyed) {
                throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
            }
            // 检测 ref 是否为空,为空则通过 init 方法创建
            if (ref == null) {// ref指向了服务的代理对象
                // 启动初始化操作 init 方法主要用于处理配置,以及调用 createProxy 生成代理类
                init();
            }
            return ref;
        }
    }
    

    在 ReferenceConfig.init() 方法中,首先会对服务引用的配置进行处理,以保证配置的正确性。

    ReferenceConfig.init() 方法的核心逻辑是调用 createProxy() 方法,调用之前会从配置中获取 createProxy() 方法需要的参数:

    public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    
        private transient volatile T ref;
        
        private transient volatile boolean initialized;
    
        private DubboBootstrap bootstrap;   
        
        public synchronized void init() {
            //避免重复加载
            if (initialized) {
                return;
            }
    
            //获取Dubbo核心容器
            if (bootstrap == null) {
                bootstrap = DubboBootstrap.getInstance();
                //进行Dubbo核心配置的加载和检查
                bootstrap.initialize();
            }
            //在对象创建后在使用其他配置模块配置对象之前检查对象配置并重写默认配置
            checkAndUpdateSubConfigs();
            //检查并生成sub配置和Local配置是否合法
            checkStubAndLocal(interfaceClass);
            //判断对象是否有mock并生成mock信息
            ConfigValidationUtils.checkMock(interfaceClass, this);
            //保存对象属性map信息
            Map<String, String> map = new HashMap<String, String>();
            map.put(SIDE_KEY, CONSUMER_SIDE);
            //添加版本信息,包含dubbo版本,release版本,timestamp运行时间戳和sid_key等信息
            ReferenceConfigBase.appendRuntimeParameters(map);
            //添加泛型 revision信息
            if (!ProtocolUtils.isGeneric(generic)) {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put(REVISION_KEY, revision);
                }
                //生成服务的代理对象,跟服务导出是一样,通过代理对象来代理,返回代理方法
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put(METHODS_KEY, ANY_VALUE);
                } else {
                    //添加需要代理的方法
                    map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
                }
            }
            //添加interface名
            map.put(INTERFACE_KEY, interfaceName);
            //添加重试信息
            AbstractConfig.appendParameters(map, getMetrics());
            //检查获取并添加Application信息
            AbstractConfig.appendParameters(map, getApplication());
            //检查获取并添加Module信息
            AbstractConfig.appendParameters(map, getModule());
            // remove 'default.' prefix for configs from ConsumerConfig
            // appendParameters(map, consumer, Constants.DEFAULT_KEY);
            //检查获取并添加consumer信息
            AbstractConfig.appendParameters(map, consumer);
            AbstractConfig.appendParameters(map, this);
            MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
            if (metadataReportConfig != null && metadataReportConfig.isValid()) {
                map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
            }
            //设置方法重试信息并收集方法异步调用信息
            Map<String, AsyncMethodInfo> attributes = null;
            if (CollectionUtils.isNotEmpty(getMethods())) {
                attributes = new HashMap<>();
                for (MethodConfig methodConfig : getMethods()) {
                    AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                    String retryKey = methodConfig.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(methodConfig.getName() + ".retries", "0");
                        }
                    }
                    AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                    if (asyncMethodInfo != null) {
    //                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                        attributes.put(methodConfig.getName(), asyncMethodInfo);
                    }
                }
            }
            //获取服务消费者 ip 地址
            String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
            if (StringUtils.isEmpty(hostToRegistry)) {
                hostToRegistry = NetUtils.getLocalHost();
            } else if (isInvalidLocalHost(hostToRegistry)) {
                throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
            }
            //添加服务注册信息
            map.put(REGISTER_IP_KEY, hostToRegistry);
            //将配置保存如服务元信息中
            serviceMetadata.getAttachments().putAll(map);
            //创建代理
            ref = createProxy(map);
    
            serviceMetadata.setTarget(ref);
            serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
            // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
            // 并将 ConsumerModel 存入到 ApplicationModel 中      
            ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
            consumerModel.setProxyObject(ref);
            consumerModel.init(attributes);
    
            initialized = true;
            //检查引入的服务是否可用
            checkInvokerAvailable();
    
            // dispatch a ReferenceConfigInitializedEvent since 2.7.4
            dispatch(new ReferenceConfigInitializedEvent(this, invoker));
        }
    }
    

    ReferenceConfig.createProxy() 方法中处理了多种服务引用的场景,例如,直连单个/多个Provider、单个/多个注册中心。下面是 createProxy() 方法的核心流程,大致可以梳理出这么 5 个步骤:

    • 1、根据传入的参数集合判断协议是否为 injvm 协议,如果是,直接通过 InjvmProtocol 引用服务。

    • 2、构造 urls 集合。Dubbo 支持直连 Provider和依赖注册中心两种服务引用方式。如果是直连服务的模式,我们可以通过 url 参数指定一个或者多个 Provider 地址,会被解析并填充到 urls 集合;如果通过注册中心的方式进行服务引用,则会调用 AbstractInterfaceConfig.loadRegistries() 方法加载所有注册中心。

    • 3、如果 urls 集合中只记录了一个 URL,通过 Protocol 适配器选择合适的 Protocol 扩展实现创建 Invoker 对象。如果是直连 Provider 的场景,则 URL 为 dubbo 协议,这里就会使用 DubboProtocol 这个实现;如果依赖注册中心,则使用 RegistryProtocol 这个实现。

    • 4、如果 urls 集合中有多个注册中心,则使用 ZoneAwareCluster 作为 Cluster 的默认实现,生成对应的 Invoker 对象;如果 urls 集合中记录的是多个直连服务的地址,则使用 Cluster 适配器选择合适的扩展实现生成 Invoker 对象。

    • 5、通过 ProxyFactory 适配器选择合适的 ProxyFactory 扩展实现,将 Invoker 包装成服务接口的代理对象。

    通过上面的流程我们可以看出createProxy() 方法中有两个核心:

    • 1、通过 Protocol 适配器选择合适的 Protocol 扩展实现创建 Invoker 对象。
    • 2、通过 ProxyFactory 适配器选择合适的 ProxyFactory 创建代理对象。

    下面我们来看 createProxy() 方法的具体实现:

    public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    
        private T createProxy(Map<String, String> map) {
            //jvm本地引入
            // 根据url的协议、scope以及injvm等参数检测是否需要本地引用
            if (shouldJvmRefer(map)) {
                // 创建injvm协议的URL
                URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
                // 本地引用invoker生成
                // 通过Protocol的适配器选择对应的Protocol实现创建Invoker对象
                invoker = REF_PROTOCOL.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                urls.clear();
                // 用户配置url信息,表明用户可能想进行点对点调用
                if (url != null && url.length() > 0) { 
                    // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
                    String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (StringUtils.isEmpty(url.getPath())) {
                                // 设置接口全限定名为 url 路径
                                url = url.setPath(interfaceName);
                            }
                            // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
                            if (UrlUtils.isRegistry(url)) {
                                // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
                                urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                                // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                                // 最后将合并后的配置设置为 url 查询字符串中。                 
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    // 从注册中心的配置中组装url信息
                    // if protocols not injvm checkRegistry
                    // 如果协议不是在jvm本地中
                    if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                        //检查注册中心是否存在(如果当前配置不存在则获取服务默认配置),然后将他们转换到RegistryConfig中
                        checkRegistry();
                        //通过注册中心配置信息组装URL
                        List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                        if (CollectionUtils.isNotEmpty(us)) {
                            for (URL u : us) {
                                //添加monitor监控信息
                                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                                if (monitorUrl != null) {
                                    map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                                }
                                // 将map中的参数整理成refer参数,添加到RegistryURL中
                                urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            }
                        }
                        // 既不是服务直连,也没有配置注册中心,抛出异常
                        if (urls.isEmpty()) {
                            throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                        }
                    }
                }
                //单个注册中心或服务提供者(服务直连,下同)
                if (urls.size() == 1) {
                    // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
                    // 在单注册中心或是直连单个服务提供方的时候,通过Protocol的适配器选择对应的Protocol实现创建Invoker对象
                    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
                } else {
                    //多个注册中心或多个服务提供者,或者两者混合
                    // 多注册中心或是直连多个服务提供方的时候,会根据每个URL创建Invoker对象
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    // 获取所有的 Invoker
                    for (URL url : urls) {
                        invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                        if (UrlUtils.isRegistry(url)) {// 确定是多注册中心,还是直连多个Provider
                            // 保存使用注册中心的最新的URL信息
                            registryURL = url; // use last registry url
                        }
                    }
                    // 注册中心URL存在
                    if (registryURL != null) { // registry url is available
                        // for multi-subscription scenario, use 'zone-aware' policy by default
                        // 多注册中心的场景中,会使用ZoneAwareCluster作为Cluster默认实现,多注册中心之间的选择
                        // 对于对区域订阅方案,默认使用"zone-aware"区域
                        String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                        // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                        // invoker 包装顺序: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                        invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                    } else { // not a registry url, must be direct invoke.
                        // 如果不存在注册中心连接,只能使用直连
                        //如果订阅区域未设置,则设置为默认区域"zone-aware"
                        String cluster = CollectionUtils.isNotEmpty(invokers)
                                ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
                                : Cluster.DEFAULT;
                        // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并        
                        invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                    }
                }
            }
    
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
    
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            MetadataUtils.publishServiceDefinition(consumerURL);
    
            // 通过ProxyFactory适配器选择合适的ProxyFactory扩展实现,创建代理对象
            return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
        }
    }
    

    RegistryProtocol

    在直连 Provider 的场景中,会使用 DubboProtocol.refer() 方法完成服务引用,DubboProtocol.refer() 方法的具体实现在之前已经详细介绍过了,这里我们重点来看存在注册中心的场景中,Dubbo Consumer 是如何通过 RegistryProtocol 完成服务引用的。

    在 RegistryProtocol.refer() 方法中,会先根据 URL 获取注册中心的 URL,再调用 doRefer 方法生成 Invoker,在 refer() 方法中会使用 MergeableCluster 处理多 group 引用的场景。

    public class RegistryProtocol implements Protocol {
    
        @Override
        @SuppressWarnings("unchecked")
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            // 从URL中获取注册中心的URL
            url = getRegistryUrl(url);
            // 获取Registry实例,这里的RegistryFactory对象是通过Dubbo SPI的自动装载机制注入的
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // 从注册中心URL的refer参数中获取此次服务引用的一些参数,其中就包括group
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
            String group = qs.get(GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                    // 如果此次可以引用多个group的服务,则Cluser实现使用MergeableCluster实现,
                    // 这里的getMergeableCluster()方法就会通过Dubbo SPI方式找到MergeableCluster实例
                    return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
                }
            }
    
            Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
            // 如果没有group参数或是只指定了一个group,则通过Cluster适配器选择Cluster实现
            return doRefer(cluster, registry, type, url);
        }
    }
    

    在 doRefer() 方法中,首先会根据 URL 初始化 RegistryDirectory 实例,然后生成 Subscribe URL 并进行注册,之后会通过 Registry 订阅服务,最后通过 Cluster 将多个 Invoker 合并成一个 Invoker 返回给上层,具体实现如下:

    public class RegistryProtocol implements Protocol {
    
        protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            return interceptInvoker(getInvoker(cluster, registry, type, url), url);
        }
        
        protected <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
            // 创建RegistryDirectory实例
            DynamicDirectory<T> directory = createDirectory(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // 生成SubscribeUrl,协议为consumer,具体的参数是RegistryURL中refer参数指定的参数
            Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
            URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (directory.isShouldRegister()) {
                // 在SubscribeUrl中添加category=consumers和check=false参数
                directory.setRegisteredConsumerUrl(urlToRegistry);
                // 服务注册,在Zookeeper的consumers节点下,添加该Consumer对应的节点
                registry.register(directory.getRegisteredConsumerUrl());
            }
            // 根据SubscribeUrl创建服务路由
            directory.buildRouterChain(urlToRegistry);
            // 订阅服务,toSubscribeUrl()方法会将SubscribeUrl中category参数修改为"providers,configurators,routers"
            // RegistryDirectory的subscribe()在前面详细分析过了,其中会通过Registry订阅服务,同时还会添加相应的监听器
            directory.subscribe(toSubscribeUrl(urlToRegistry));
    
            // 注册中心中可能包含多个Provider,相应地,也就有多个Invoker,
            // 这里通过前面选择的Cluster将多个Invoker对象封装成一个Invoker对象
            return (ClusterInvoker<T>) cluster.join(directory);
        }
    
        protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url) {
            // 根据URL中的registry.protocol.listener参数加载相应的监听器实现
            List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
            if (CollectionUtils.isEmpty(listeners)) {
                return invoker;
            }
        
            //引入了RegistryProtocol侦听器,以使用户有机会自定义或更改导出并引用RegistryProtocol的行为。
            // 例如:在满足某些条件时立即重新导出或重新引用。
            for (RegistryProtocolListener listener : listeners) {
                listener.onRefer(this, invoker);
            }
            return invoker;
        }   
    }
    

    总结

    本文重点介绍了 Dubbo 服务引用的整个流程:

    • 首先,我们介绍了 DubboBootStrap 这个入口门面类与服务引用相关的方法,其中涉及 referServices()、reference() 等核心方法。

    • 接下来,我们分析了 ReferenceConfigCache 这个 ReferenceConfig 对象缓存,以及 ReferenceConfig 实现服务引用的核心流程。

    • 最后,我们还讲解了 RegistryProtocol 从注册中心引用服务的核心实现。

    相关文章

      网友评论

        本文标题:Dubbo——服务引用流程

        本文链接:https://www.haomeiwen.com/subject/pyrnsltx.html