美文网首页程序员
Dubbo客户端初始化过程

Dubbo客户端初始化过程

作者: 凡毓不凡 | 来源:发表于2021-12-22 00:46 被阅读0次

    导读

    • 上一篇我们分析了 Dubbo设计之ExtensionLoader,了解了其原理之后,就方便我们分析Dubbo客户端得初始化过程了。
    • 关键字Dubbo客户端初始化、Netty入门级用法
    • Dubbo版本 : 2.7+
    • 入口: ReferenceConfig

    客户端初始化:

    • 下面是Dubbo消费者API使用方式的示例代码
    // 创建ReferenceConfig,此类会有一系列的初始化准备过程
    ReferenceConfig<AlipayInfoAbility> reference = new ReferenceConfig<>();
    // 设置消费者应用配置
    reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
    // 设置注册地址
    reference.setRegistry(new RegistryConfig("zookeeper://" + zookeeperHost + ":" + zookeeperPort));
    // 设置引用的API接口
    reference.setInterface(AlipayInfoAbility.class);
    // 关闭服务启动检查(建议不要关闭,找不到服务要在应用发布时就要解决)
    reference.setCheck(false);
    // 获取引用
    AlipayInfoAbility service = reference.get();
    

    上述代码中, reference.get()是整个调用方获取接口代理得核心起始点。主要核心逻辑如下:
    设置接口级别得公共参数 Map<String, String> map:为初始化代理引用ref做准备,例如:设置key = side,value = consumer,表示当前是服务调用方;通过 MetricsConfigRegistryConfig(注册中心)ConfigCenterConfig(配置中心)ApplicationConfigModuleConfigConsumerConfig(可以指定消费端线程池类型:cached, fixed, limit, eager;核心线程数、最大线程数、队列大小、共享连接数shareconnections(TCP长连接得客户端数量))MethodConfig(可以指定方法相关得参数配置:可以设置重试次数、并发数(executes)、是否启动session粘连(sticky)、方法是否有异步回调等)等初始化配置来填充 map。
    ② 接着就是调用创建代理引用得核心方法org.apache.dubbo.config.ReferenceConfig#createProxy

           private T createProxy(Map<String, String> map) {
    
            if (shouldJvmRefer(map)) { // 1.判断是否是本地调用(通过scope=local、injvm=false都可以),重要用于本地测试,防止调用远程服务器
                URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
                invoker = REF_PROTOCOL.refer(interfaceClass, url); // 通过自适应扩展点机制获取代理Invoker
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else { // 2.不是本地调用
                urls.clear(); // reference retry init will add url to urls, lead to OOM
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (StringUtils.isEmpty(url.getPath())) {
                                url = url.setPath(interfaceName);
                            }
                            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 3.协议类型是注册(registry)
                                urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    // if protocols not injvm checkRegistry
                    if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){ // 若注册协议不是injvm
                        // 校验注册中心配置(RegistryConfig),会向后兼容-Ddubbo.registry.address启动参数
                        checkRegistry();
                        // 4.通过配置中心相关设置解析成URL(一般注册中心都使用 zookeeper,没配置默认是dubbo)
                        // 执行完之后会将protocol属性key设置为registry,当下面调用协议自适应扩展点得时候就会找到 Registryprotocol
                        List<URL> us = loadRegistries(false);
                        if (CollectionUtils.isNotEmpty(us)) {
                            for (URL u : us) {
                                // 5.加载监控配置,显式设置直接使用。没显式设置:判断是否有logstat监控扩展点工厂,没有会使用Dubbo得监控
                                URL monitorUrl = loadMonitor(u);
                                if (monitorUrl != null) {
                                    map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                                }
                                urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            }
                        }
                        if (urls.isEmpty()) {
                            throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                        }
                    }
                }
    
                if (urls.size() == 1) {// 注册中心单节点
                    // 6.通过Registryprotocol$Apaptive 自适应扩展点实现获取Invoker
                    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
                } else { // 注册中心集群
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        // 7.通过Registryprotocol$Apaptive获取Invoker,放入到列表中
                        invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
                    if (registryURL != null) { // registry url is available
                        // use RegistryAwareCluster only when register's CLUSTER is available
                        URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                        // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                        invoker = CLUSTER.join(new StaticDirectory(u, invokers));// 8.通过集群RegistryAwareCluster对Invoker进行包装
                    } else { // not a registry url, must be direct invoke.
                        invoker = CLUSTER.join(new StaticDirectory(invokers));
                    }
                }
            }
            // 若开启Dubbo服务检查,并且服务接口不可用,则直接抛异常
            if (shouldCheck() && !invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            /**
             * @since 2.7.0
             * ServiceData Store
             */
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
                metadataReportService.publishConsumer(consumerURL);// 9.消费者元数据信息发布
            }
            // create service proxy
            return (T) PROXY_FACTORY.getProxy(invoker);// 10.真正开始创建代理对象,通过代理工厂创建代理引用(代理工厂的默认扩展点名是 javassist )
        }
    

    上述方法中有几处重要得地方:1、2、3、4、5、6(单节点)、7、8(集群)、9、10;其中6(7)、8、10是重中之重。下面主要来分析6(7)、8、10这几个关键步骤
    6(7) 步骤核心子流程如下:
    ① 调用协议的自适应扩展点获取真正端得协议扩展点(Registryprotocol),(扩展点的原理:Dubbo设计之ExtensionLoader),但是由于Protocol扩展点有包装类(ProtocolFilterWrapper、ProtocolListenerWrapper),所以获取真正的扩展实现之后会被包装类包装起来: ProtocolFilterWrapper.refer -> ProtocolListenerWrapper.refer -> Registryprotocol.refer

    // ProtocolFilterWrapper    
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url); // 由于当前协议是registry,调用ProtocolListenerWrapper的 refer方法
            }
            return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
        }
    
    // ProtocolListenerWrapper    
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url); // 由于当前协议是registry,调用 RegistryProtocol 的refer方法
            }
            return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                    Collections.unmodifiableList(
                            ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                    .getActivateExtension(url, INVOKER_LISTENER_KEY)));
        }
    

    ② 接着通过ProtocolListenerWrapper调用真实的扩展实现RegistryProtocol 的refer方法:

    // RegistryProtocol 
        @Override
        @SuppressWarnings("unchecked")
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            // 重新构建一个protocol为 `zookeeper`的url,如果url没有指定protocol的话,默认是 `dubbo`
            url = URLBuilder.from(url)
                    .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                    .removeParameter(REGISTRY_KEY)
                    .build();
            // 调用 `RegistryFactory扩展点`的自适应扩展实现,此处根据url中的protocol=zookeeper,会去找到真正的扩展点实现`ZookeeperRegistryFactory`,然后调用其getRegistry方法(实际上此方法继承自父类 AbstractRegistryFactory)生成ZookeeperRegistry实例。
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // group="a,b" or group="*"
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
            String group = qs.get(GROUP_KEY);// 假设group为空的情况
            if (group != null && group.length() > 0) {
                if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                    return doRefer(getMergeableCluster(), registry, type, url);// 不执行此处代码
                }
            }
            return doRefer(cluster, registry, type, url);// 通过集群扩展点对注册实例进行包装
        }
    

    ③ 调用 ZookeeperRegistryFactory 的 getRegistry方法(继承自 AbstractRegistryFactory )获取注册实现:

    // AbstractRegistryFactory 
        @Override
        public Registry getRegistry(URL url) {
            // 构建一个路径为RegistryService的注册URL
            url = URLBuilder.from(url)
                    .setPath(RegistryService.class.getName())
                    .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                    .removeParameters(EXPORT_KEY, REFER_KEY)
                    .build();
            String key = url.toServiceStringWithoutResolving();
            // Lock the registry access process to ensure a single instance of the registry
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);// 从缓存中取,不等于空直接返回
                if (registry != null) {
                    return registry;
                }
                //create registry by spi/ioc
                registry = createRegistry(url); // 模板方法,通过具体的实现类(此处是ZookeeperRegistryFactory)去获取注册实例
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry); // 获取之后放入缓存中,防止并发场景下重复创建
                return registry;
            } finally {
                // Release the lock
                LOCK.unlock();
            }
        }
    
        protected abstract Registry createRegistry(URL url); // 抽象模板方法
    
    // ZookeeperRegistryFactory 的方法实现
        @Override
        public Registry createRegistry(URL url) {
            // 创建注册实例,注册的url为 zookeeper://xxx:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-api-consumer&dubbo=XXX&interface=org.apache.dubbo.registry.RegistryService&pid=13740&timestamp=1640015273548
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    // 构造器
        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            super(url);
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(PATH_SEPARATOR)) {
                group = PATH_SEPARATOR + group;
            }
            this.root = group;
            zkClient = zookeeperTransporter.connect(url);// 连接zk server端
            zkClient.addStateListener(state -> {
                if (state == StateListener.RECONNECTED) {
                    try {
                        recover(); // 故障恢复
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            });
        }
    // FailbackRegistry( ZookeeperRegistry的父类 )
        @Override
        protected void recover() throws Exception {
            // register
            Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());// 获取已经注册的URL
            if (!recoverRegistered.isEmpty()) {
                if (logger.isInfoEnabled()) {
                    logger.info("Recover register url " + recoverRegistered);
                }
                for (URL url : recoverRegistered) {
                    addFailedRegistered(url);// 添加到失败的列表中,并通过时间轮(HashedWheelTimer)定时重试
                }
            }
            // subscribe
            Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); // 获取订阅的URL跟对应的通知状态变更的监听器
            if (!recoverSubscribed.isEmpty()) {
                if (logger.isInfoEnabled()) {
                    logger.info("Recover subscribe url " + recoverSubscribed.keySet());
                }
                for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                    URL url = entry.getKey();
                    for (NotifyListener listener : entry.getValue()) {
                        addFailedSubscribed(url, listener);// 添加到失败订阅列表中,并通过时间轮(HashedWheelTimer)定时重试
                    }
                }
            }
        }
    

    ④ 调用 org.apache.dubbo.registry.integration.RegistryProtocol#doRefer方法,通过集群扩展点Cluster对本地注册目录进行合并:

    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            // 创建本地注册目录(RegistryDirectory本身就是一个org.apache.dubbo.registry.NotifyListener),当监听到服务端地址变化时,更新本地url信息
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
                // 设置消费的URL
                directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
                // 调用注册实例的注册方法,将消费的URL注册到本地
                registry.register(directory.getRegisteredConsumerUrl());
            }
            // 构建路由链 (根据优先级依次为 MockRouterFactory、TagRouterFactor、AppRouterFactory、ServiceRouterFactory)
            directory.buildRouterChain(subscribeUrl);
    
            // 订阅URL(consumer://127.0..1/com.alibaba.mp.alipay.AlipayInfoAbility?application=dubbo-demo-api-consumer&category=providers,configurators,routers&dubbo=2.0.2&interface=com.alibaba.mp.alipay.AlipayInfoAbility&lazy=false&methods=getAlipayInfo&pid=11856&side=consumer&sticky=false&timestamp=1640020348831)
            directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                    PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    
            // 合并成一个virtual的Invoker
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    

    ⑤ 注册消费的URL、构建调用路由链、订阅要消费的URL(RegistryDirectory中)

    // FailbackRegistry 重写了AbstractRegistry的register方法,增加将消费者URL注册到服务端的的逻辑
        @Override
        public void register(URL url) {
            // 缓存到本地
            super.register(url);
            removeFailedRegistered(url);
            removeFailedUnregistered(url);
            try {
                // Sending a registration request to the server side
                doRegister(url); // 模板方法,向服务端发送注册请求
            } catch (Exception e) {
                Throwable t = e;
                ...
            }
        }
    // ZookeeperRegistry
        @Override
        public void doRegister(URL url) {
            try {
                // 在zk上创建提供方URL的临时节点(提供方目录是临时的)
                zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
            } catch (Throwable e) {
                ...
            }
        }
    
    // 构建路由链,用于后续从集群中选择路由时使用
        public void buildRouterChain(URL url) {
            this.setRouterChain(RouterChain.buildChain(url));
        }
    
    // RouterChain
        public static <T> RouterChain<T> buildChain(URL url) {
            return new RouterChain<>(url);
        }
    
        private RouterChain(URL url) {
    
            // 自然序获取RouterFactory所有的激活扩展点实现,(MockRouterFactory、TagRouterFactor、AppRouterFactory、ServiceRouterFactory)
            List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
                    .getActivateExtension(url, (String[]) null);
            // 获取路由集合
            List<Router> routers = extensionFactories.stream()
                    .map(factory -> factory.getRouter(url))
                    .collect(Collectors.toList());
    
            initWithRouters(routers);
        }
    // RegistryDirectory 的订阅方法
        public void subscribe(URL url) {
            // 缓存到本地注册目录中
            setConsumerUrl(url);
            // 添加 节点变更监听器用于通知服务上下线
            CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
            // 通知 configurator/router 变更
            serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
            // 开始订阅消费URL,每个对应一个注册目录,并注册与远端服务子节点之间的映射关系,当zk节点发生变化时会回调当前监听器更新本地消费URL
            registry.subscribe(url, this);
        }
    
    // FailbackRegistry,listener为 RegistryDirectory
        @Override
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // Sending a subscription request to the server side
                doSubscribe(url, listener); // 模板方法,调用子类(ZookeeperRegistry)的订阅方法
            } catch (Exception e) {
                Throwable t = e;
                ...
            }
        }
    // ZookeeperRegistry 的模板实现方法
        @Override
        public void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    ...
                } else {
                    List<URL> urls = new ArrayList<>();
                    // toCategoriesPath:
                    // 0 = "/dubbo/com.alibaba.mp.alipay.AlipayInfoAbility/providers"
                    // 1 = "/dubbo/com.alibaba.mp.alipay.AlipayInfoAbility/configurators"
                    // 2 = "/dubbo/com.alibaba.mp.alipay.AlipayInfoAbility/routers"
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            zkListener = listeners.get(listener);
                            // 创建zk子节点监听器,用于监听节点变化,然后调用notify()方法变更本地目录
                            listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        }
                        // 创建持久化节点
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    // 通知RegistryDirectory监听器,修改本地URL
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                ...
            }
        }
    // FailbackRegistry
        @Override
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            try {
                doNotify(url, listener, urls); // 模板方法,ZookeeperRegistry 实现通知变更
            } catch (Exception t) {
                ...
            }
        }
    // AbstractRegistry
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((CollectionUtils.isEmpty(urls))
                    && !ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            // keep every provider's category.
            Map<String, List<URL>> result = new HashMap<>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                    List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            // 将提供方的URL放入到已通知的对象映射中
            Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                // 调用监听器通知变更,listener = RegistryDirectory
                listener.notify(categoryList);
                // We will update our cache file after each notification.
                // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
                saveProperties(url);
            }
        }
    // RegistryDirectory,更新本地对应的目录
        @Override
        public synchronized void notify(List<URL> urls) {
            // 根据 configurators、routers、providers进行分组
            Map<String, List<URL>> categoryUrls = urls.stream()
                    .filter(Objects::nonNull)
                    .filter(this::isValidCategory)
                    .filter(this::isNotCompatibleFor26x)
                    .collect(Collectors.groupingBy(url -> {
                        if (UrlUtils.isConfigurator(url)) {
                            return CONFIGURATORS_CATEGORY;
                        } else if (UrlUtils.isRoute(url)) {
                            return ROUTERS_CATEGORY;
                        } else if (UrlUtils.isProvider(url)) {
                            return PROVIDERS_CATEGORY;
                        }
                        return "";
                    }));
            // 若监听到category有变更,则取代之前的值
            List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
            // 若监听到有新得路由,则添加到路由链中
            List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
            // 刷新+覆盖
            refreshOverrideAndInvoker(providerURLs);
        }
    
        private void refreshOverrideAndInvoker(List<URL> urls) {
            // mock zookeeper://xxx?mock=return null
            overrideDirectoryUrl();
            // 刷新本地缓存的Invokers
            refreshInvoker(urls);
        }
    
    private void refreshInvoker(List<URL> invokerUrls) {
            Assert.notNull(invokerUrls, "invokerUrls should not be null");
    
            if (invokerUrls.size() == 1
                    && invokerUrls.get(0) != null
                    && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // Forbid to access
                this.invokers = Collections.emptyList();
                routerChain.setInvokers(this.invokers);
                destroyAllInvokers(); // Close all invokers
            } else {
                this.forbidden = false; // Allow to access
                Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                if (invokerUrls == Collections.<URL>emptyList()) {
                    invokerUrls = new ArrayList<>();
                }
                if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    this.cachedInvokerUrls = new HashSet<>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
                }
                if (invokerUrls.isEmpty()) {
                    return;
                }
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
    
                /**
                 * If the calculation is wrong, it is not processed.
                 *
                 * 1. The protocol configured by the client is inconsistent with the protocol of the server.
                 *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
                 * 2. The registration center is not robust and pushes illegal specification data.
                 *
                 */
                if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                            .toString()));
                    return;
                }
    
                List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
                // pre-route and build cache, notice that route cache should build on original Invoker list.
                // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
                routerChain.setInvokers(newInvokers);
                // 覆盖本地缓存,调用的时候就是从此成员变量中选取的
                this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
                this.urlInvokerMap = newUrlInvokerMap;
    
                try {
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }
    
        private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
            Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
            if (urls == null || urls.isEmpty()) {
                return newUrlInvokerMap;
            }
            Set<String> keys = new HashSet<>();
            String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
            for (URL providerUrl : urls) {
                // If protocol is configured at the reference side, only the matching protocol is selected
                if (queryProtocols != null && queryProtocols.length() > 0) {
                    boolean accept = false;
                    String[] acceptProtocols = queryProtocols.split(",");
                    for (String acceptProtocol : acceptProtocols) {
                        if (providerUrl.getProtocol().equals(acceptProtocol)) {
                            accept = true;
                            break;
                        }
                    }
                    if (!accept) {
                        continue;
                    }
                }
                if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                    continue;
                }
                // protocol = dubbo
                if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                    logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                            " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                            " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                            ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                    continue;
                }
                
                URL url = mergeUrl(providerUrl);
    
                String key = url.toFullString(); // The parameter urls are sorted
                if (keys.contains(key)) { // Repeated url
                    continue;
                }
                keys.add(key);
                // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
                Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
                if (invoker == null) { // Not in the cache, refer again
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(DISABLED_KEY)) {
                            enabled = !url.getParameter(DISABLED_KEY, false);
                        } else {
                            enabled = url.getParameter(ENABLED_KEY, true);
                        }
                        // 默认情况下,enabled = true
                        if (enabled) {
                            // 根据当前key创建一个委派Invoker:InvokerDelegate。
                            // 重点:而此处url的protocol = dubbo,那么 protocol.refer(serviceType, url)会执行 Dubboprotocol的refer方法
                            // 由于此处的 protocol引用是自适应扩展点,所以还是会经过ProtocolFilterWrapper跟ProtocolListenerWrapper进行包装:ProtocolFilterWrapper.refer -> ProtocolListenerWrapper.refer -> Dubboprotocol.refer
                            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(key, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(key, invoker);
                }
            }
            keys.clear();
            return newUrlInvokerMap;
        }
    // ProtocolFilterWrapper 的refer方法
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {// 协议是dubbo,此处不成立
                return protocol.refer(type, url);
            }
            // 调用 ProtocolListenerWrapper 的refer方法先生成一个Invoker
            // 将生成的Invoker与Filter扩展点构建消费端的Invoker链,REFERENCE_FILTER_KEY = reference.filter,
            // 经过返回 ProtocolFilterWrapper 之后,生成的是 CallbackRegistrationInvoker
            return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
        }
    // 构建Filter链
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            // 此处的 invoker是ProtocolListenerWrapper.refer执行之后的Invoker,即 ListenerInvokerWrapper
            Invoker<T> last = invoker;
            // 获取消费者端的所有满足条件的激活扩展点,此处会根据排序字段升序排列
            // ConsumerContextFilter -> FutureFilter -> MonitorFilter
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    
            if (!filters.isEmpty()) {
                // 循环从后往前组成Invoker链,为了调用时按照设定的顺序执行
                // 循环顺序:MonitorFilter -> FutureFilter -> ConsumerContextFilter
                // 请求处理顺序:ConsumerContextFilter -> FutureFilter -> MonitorFilter
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
                        ...
                        ...
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            Result asyncResult;
                            try {
                                asyncResult = filter.invoke(next, invocation);
                            } catch (Exception e) {
                                // onError callback
                                if (filter instanceof ListenableFilter) {
                                    Filter.Listener listener = ((ListenableFilter) filter).listener();
                                    if (listener != null) {
                                        // 钩子方法
                                        listener.onError(e, invoker, invocation);
                                    }
                                }
                                throw e;
                            }
                            return asyncResult;
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
        // 创建一个回调的 Invoker,如果定义的Filter是 ListenableFilter的实现类
            // 那么调用返回时,会回调Filter.Listener的 onResponse(正常返回) 和 onError方法(异常情况)
            return new CallbackRegistrationInvoker<>(last, filters);
        }
    
    // ProtocolListenerWrapper的refer方法
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {// protocol = dubbo,此处不成立
                return protocol.refer(type, url);
            }
            // 此处的 protocol是真实的 DubboProtocol
            // 调用 DubboProtocol的refer 之后生成 DubboInvoker
            // 获取 InvokerListener的激活扩展点实现,若是存在,则调用扩展实现的 referred(Invoker<?> invoker)方法,(类似Spring中的BeanPostProcessor)
            // 返回 ListenerInvokerWrapper
            return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                    Collections.unmodifiableList(
                            ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                    .getActivateExtension(url, INVOKER_LISTENER_KEY)));
        }
    
    
    // AbstractProtocol (DubboProtocol的父类)
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            // protocolBindingRefer 方法是个抽象方法,让其子类去实现
            return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
        }
    
    
    // DubboProtocol
        @Override
        public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
    
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
    
            return invoker;
        }
    
        private ExchangeClient[] getClients(URL url) {
            
            // whether to share connection
            boolean useShareConnect = false;
    
            int connections = url.getParameter(CONNECTIONS_KEY, 0);
            List<ReferenceCountExchangeClient> shareClients = null;
            // if not configured, connection is shared, otherwise, one connection for one service
            // 没有配置 connections 参数的话,默认使用共享连接(一个服务公用一个连接) 
            if (connections == 0) {
                useShareConnect = true;
    
                /**
                 * The xml configuration should have a higher priority than properties.
                 */
                String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
                connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                        DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
                
                // 获取共享客户端连接
                shareClients = getSharedClient(url, connections);
            }
            // 根据连接数创建请求客户端数组(ReferenceCountExchangeClient)
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                // 使用共享连接
                if (useShareConnect) {
                    clients[i] = shareClients.get(i); // shareconnections 优先级高于 connections
    
                } else {
                    clients[i] = initClient(url); // 初始化客户端连接 HeaderExchangeClient
                }
            }
    
            return clients;
        }
    
        private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
            // 检查完缓存中没有的话,也会调用此方法去创建客户端连接
            ExchangeClient exchangeClient = initClient(url);
    
            return new ReferenceCountExchangeClient(exchangeClient);
        }
    
        private ExchangeClient initClient(URL url) {
    
            // client type setting.
            String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
            // 设置默认的编解码为 codec = dubbo(DubboCodec)
            url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
            // enable heartbeat by default(1分钟)
            url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
    
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
    
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
    
                } else {
                    // 通过交换层 Exchangers工具类 生成客户端
                    client = Exchangers.connect(url, requestHandler);
                }
    
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
    
            return client;
        }
    
    // 中间的代码就不一一粘贴了,下面来看一下创建NettyClient时关键性的一步:
    
        /**
         * The constructor of NettyClient.
         * It wil init and start netty.
         */
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            // 调用父类构造器
            super(url, wrapChannelHandler(url, handler));
        }
    
    // AbstractClient 的构造器,有几处模板方法:doOpen()、connect()、close()
        public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
    
            needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    
            try {
                // 打开连接通道
                doOpen();
            } catch (Throwable t) {
                close();
                ...
            }
            try {
                // connect.
                connect();
                ...
                }
            } catch (RemotingException t) {
                if (url.getParameter(Constants.CHECK_KEY, true)) {
                    close(); // 异常关闭连接
                    throw t;
                } else {
                    ...
                }
            } catch (Throwable t) {
                close(); // 异常关闭连接
                ...
            }
    
            ...
        }
    
    // NettyClient(注意此处说的这个类都是Dubbo自己的,Netty里面可没有叫这玩意的)抽象实现
        @Override
        protected void doOpen() throws Throwable {
            // 创建一个统一出入站处理器(为啥是出入? 因为实现了 io.netty.channel.ChannelDuplexHandler)
            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
            // 创建客户端启动类
            bootstrap = new Bootstrap();
            // nioEventLoopGroup 是一个线程数为 CPU+1的 NioEventLoopGroup(事件循环组,可以当成线程池)
            bootstrap.group(nioEventLoopGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) 
                    // io.netty.channel.socket.nio.NioSocketChannel中会缓存一个 SocketChannel 用于与服务端进行通信
                    .channel(NioSocketChannel.class); 
    
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
            bootstrap.handler(new ChannelInitializer() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                    // 自定义编解码器(粘包拆包)
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ch.pipeline()
                        // 可以参考 io.netty.channel.ChannelPipeline 中的注释说明
                        // 读:decoder -> client-idle-handler -> handler
                        // 写:handler -> client-idle-handler -> encoder
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                            .addLast("handler", nettyClientHandler);
                    String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                    if(socksProxyHost != null) {
                        int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                        Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                        ch.pipeline().addFirst(socks5ProxyHandler);
                    }
                }
            });
        }
    
    // NettyClient 抽象实现
        @Override
        protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            // 通过调用netty 封装好的客户端与服务端进行连接,此连接会立马返回一个 ChannelFuture(DefaultChannelPromise)。
            // 类似于 JUC 中的 Future
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try {
                // 根据超时时间不中断获取连接结果
                boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
                // 连接成功
                if (ret && future.isSuccess()) {
                    Channel newChannel = future.channel();
                    try {
                        // Close old channel
                        // copy reference
                        Channel oldChannel = NettyClient.this.channel;
                        if (oldChannel != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                                }
                                oldChannel.close();
                            } finally {
                                NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    } finally {
                        if (NettyClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                                NettyClient.this.channel = null;
                                NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        } else {
                            NettyClient.this.channel = newChannel;
                        }
                    }
                } else if (future.cause() != null) {
                    ...
            } finally {
                // just add new valid channel to NettyChannel's cache
                if (!isConnected()) {
                    //future.cancel(true);
                }
            }
        }
    
    

    关键流程

    1. Invoker链创建过程:
      ① DubboInvoker -> 真正发送请求的Invoker
      ② AsyncToSyncInvoker(DubboInvoker ** ) ->
      ③ ListenerInvokerWrapper( AsyncToSyncInvoker ) ->
      ④ CallbackRegistrationInvoker( ListenerInvokerWrapper ) -> Filter扩展点钩子回调
      ⑤ InvokerDelegate(
      CallbackRegistrationInvoker) ->
      ⑥ FailoverClusterInvoker( RegistryDirectory(
      InvokerDelegates) ) -> RegistryDirectory 中存放调用时的Invokes
      ⑦ MockClusterInvoker (
      RegistryDirectory,FailoverClusterInvoker**) 暴漏给接口拦截的入口Invoker
    • PS:上面是创建顺序,调用时自然是相反的顺序。那么如何方便记忆呢 ?
      Ⅰ 最后一个Invoker,实际发出网络请求的Invoker:DubboInvoker,铁打不变 !!!
      Ⅱ 由于 DubboInvoker 是异步返回,若是客户端指定同步调用(默认同步),则需要在AsyncToSyncInvoker(最靠近DubboInvoker)
      Ⅲ CallbackRegistrationInvoker,对返回结果的第一层处理。通过定义的Filter类型决定是否调用 onResponse方法 或者 onError方法
      Ⅳ Mock优先
      Ⅴ 高可用高于任何其他的执行Invoker( 故障转移:FailoverClusterInvoker
    1. Netty客户端创建过程:
      org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler -> 真正处理消息的收发处理器
      HeaderExchangeHandler(requestHandler) ->
      DecodeHandler(HeaderExchangeHandler) -> 自定义编解码处理器(拆包、粘包)
      AllChannelHandler(DecodeHandler,URL) -> 请求分发处理器,默认所有请求都转发
      HeartbeatHandler(AllChannelHandler) ->
      MultiMessageHandler (HeartbeatHandler) -> 消息类型是 MultiMessage,不过一般的请求都是 Request类型
      NettyClient(Url,MultiMessageHandler) -> 封装了一系列打开、链接通道逻辑
      HeaderExchangeClient(NettyClient,true) ->
      ReferenceCountExchangeClient(HeaderExchangeClient) 共享连接才会创建 ReferenceCountExchangeClient,暴漏给客户端使用的Client

    8这一步是通过如下两行代码:

    // 设置cluster = registryaware( RegistryAwareCluster )
    URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
    // 通过集群的自适应扩展点将多个Invokers( MockClusterInvoker 集合)合并成一个Invoker(RegistryAwareClusterInvoker是用来做集群路由,选择一个实际调用的Invoker),此处RegistryAwareClusterInvoker是用来做集群路由,选择一个实际调用的Invoker
    invoker =  CLUSTER.join(new StaticDirectory(u, invokers))
    

    到这里,大家千万不要被 "满眼的Invoker" 给绕晕了,仔细分析一下Dubbo的执行链与模块分层是可以理清楚的
    接着上面 执行链,此处会先包装一个StaticDirectory,然后 :
    MockClusterWrapper(MockClusterInvoker )-> RegistryAwareCluster(RegistryAwareClusterInvoker, 最终返回的Invoker)-> StaticDirectory (URL , List<MockClusterInvoker>)
    StaticDirectory:通过前面设置的Router链来执行路由选择,会选择最后一个Router所路由到的List<Invoker>集合上
    LoadBalance:负载均衡器(默认random,随机),从前面选择到的List<Invoker>集合上,根据负载均衡算法选择一个
    路由跟负载均衡都是调用invoke方法时才会用到,我们初始化过程分析完之后,调用过程自然就清晰了

    9步骤有以下核心子流程:
    ① 接着就开始创建代理引用了: 通过调用 ProxyFactory扩展点的适配类的getProxy方法获取代理对象:

    // ProxyFactory$Adaptive.getProxy(invoker) -> 找到默认的扩展点名称(javassist)-> JavassistProxyFactory(实际是父类 AbstractProxyFactory的 getProxy方法)
    return (T) PROXY_FACTORY.getProxy(invoker)
    
    // AbstractProxyFactory
        @Override
        public <T> T getProxy(Invoker<T> invoker) throws RpcException {
            // 非泛化方式获取接口代理对象
            return getProxy(invoker, false);
        }
        
    // JavassistProxyFactory
        @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            // 此处的创建代理的方式类似于 JDK动态代理,不过生成的class是借助与 javassist 字节码工具自定义的
            // 而且生成的代理类继承了org.apache.dubbo.common.bytecode.Proxy
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
    

    ② 若不指定 interfaces这个key的话,生成的代理类会同时实现interfaceEchoService(用于回声测试)接口,具体代码 Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)) 。其中 Proxy.getProxy(interfaces) 会为接口生成一个代理类,代理继承了 org.apache.dubbo.common.bytecode.Proxy 抽象类并且实现了 interfaces生成的代理类如下

    public class com.alibaba.dubbo.common.bytecode.proxy0  extends com.alibaba.dubbo.common.bytecode.Proxy implements com.alibaba.dubbo.common.bytecode.ClassGenerator$DC, com.alibaba.dubbo.rpc.service.EchoService, com.alibaba.mp.alipay.AlipayInfoAbility {
    
    
          public static java.lang.reflect.Method[] methods;
          private java.lang.reflect.InvocationHandler handler;
    
    
          public (java.lang.reflect.InvocationHandler arg0){ 
    
              handler = $1;
          }      
    
          // 代理的接口方法
          public com.alibaba.soa.common.response.SoaResponse getAlipayInfo(GetAlipayInfoRequest arg0){
    
                Object[] args = new Object[1]; 
    
                args[0] = ($w)$1; 
                // 调用handler的目标方法,此处是 InvokerInvocationHandler
                Object ret = handler.invoke(this, methods[0], args); 
    
                return (com.alibaba.soa.common.response.SoaResponse)ret;
          }
    
    
          // 默认都会实现 EchoSerice接口,用于回声测试
          public java.lang.Object $echo(java.lang.Object arg0) {
    
                Object[] args = new Object[1];
    
                args[0] = ($w)$1;
    
                Object ret = handler.invoke(this, methods[3], args);
    
                return (java.lang.Object)ret;
          }
    
          // 创建代理对象
          public Object newInstance(java.lang.reflect.InvocationHandler h) {
    
                return new com.alibaba.dubbo.common.bytecode.proxy0($1); 
          }
    

    ③ 看了上述动态生成代理类的过程,再来看 Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)) 这个操作就会很清晰了。拿上面的实现方法来看,当调用ReferenceConcig.get()获取的对象就是Proxy0。调用接口的getAlipayInfo方法,实际上调用的是 Proxy0getAlipayInfo方法,此代理中,会调用handler的 invoke方法,再来看一下 InvokerInvocationHandler中的实现逻辑。
    ④ 正常情况下,我们使用JDK动态代理的时候才会实现java.lang.reflect.InvocationHandler,此处完全可以不实现InvocationHandler。但是由于代理的本质是相同的,作者为了代码逻辑复用,统一处理了。(PS: 最后这句是我猜的 😁

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker; // RegistryAwareClusterInvoker
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
    
            // 此处终于开始调用了。。。
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    
    }
    
    • 到此,客户端的初始化Invoker过程已经完毕,后面就是真正开始调用的过程,下一篇我们来分析客户端的调用过程与Netty底层源码解读。
    1. 文章要是勘误或者知识点说的不正确,欢迎评论与交流, 希望学习的道路不孤独。由于文章是作者通过阅读源码获得的知识,难免会有疏忽!
    2. 要是感觉文章对你有所帮助,不妨点个关注,或者移驾看一下作者的其他文集,也都是干活多多哦,文章也在全力更新中。
    3. 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处!

    相关文章

      网友评论

        本文标题:Dubbo客户端初始化过程

        本文链接:https://www.haomeiwen.com/subject/pfyznhtx.html