美文网首页
Dubbo服务引用源码解析

Dubbo服务引用源码解析

作者: 奈何缘浅wyj | 来源:发表于2021-01-10 17:38 被阅读0次

    在Dubbo中有两种引用方式:第一种是服务直连,第二种是基于注册中心进行引用。服务直连一般用在测试的场景下,线上更多的是基于注册中心的方式。

    ​ 服务的引用分为饿汉式和懒汉式,饿汉即调用ReferenceBean的afterPropertiesSet方法时引用;懒汉即ReferenceBean对应的服务被注入到其他类时引用,也就是用到了这个服务才会引用。Dubbo默认是懒汉式引用。

    ​ 同样的,可以在DubboNamespaceHandler里面找到服务引用的入口,即ReferenceBean类。入口方法为getObject:

    public Object getObject() throws Exception {
        return get();
    }
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }
    
    

    ​ 以上两个方法比较简单,如果ref不为空就进入初始化方法。但是这里有个bug,如果Dubbo源码在2.6.5以下,debug调试时,ref不为空,也就不会进入到init方法了。解决方法是:断点直接打在init方法那一行。或者在IDEA的设置里面做如下设置:

    0.处理配置

    ​ 进入到init方法,这个方法做的工作比较多,总的来说就是从各种地方获取配置信息,详细解释如下:

    private void init() {
        //避免重复初始化
        if (initialized) {
            return;
        }
        initialized = true;
        //检查接口名是否合法
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        }
        // get consumer's global configuration
        //略。。。
        //从系统变量中获取与接口名对应的属性值
        String resolve = System.getProperty(interfaceName);
        String resolveFile = null;
        //如果没有获取到,就去加载
        if (resolve == null || resolve.length() == 0) {
            //从系统变量中获取获取解析文件路径
            resolveFile = System.getProperty("dubbo.resolve.file");
            if (resolveFile == null || resolveFile.length() == 0) {
                //如果没获取到,就从指定位置加载
                File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
                if (userResolveFile.exists()) {
                    //获取绝对路径
                    resolveFile = userResolveFile.getAbsolutePath();
                }
            }
            if (resolveFile != null && resolveFile.length() > 0) {
                Properties properties = new Properties();
                FileInputStream fis = null;
                try {
                    fis = new FileInputStream(new File(resolveFile));
                    //从文件中加载配置
                    properties.load(fis);
                } catch (IOException e) {
                    throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
                } finally {
                    try {
                        if (null != fis) fis.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
                //获取与接口对应的配置
                resolve = properties.getProperty(interfaceName);
            }
        }
        if (resolve != null && resolve.length() > 0) {
            url = resolve;
            //打印日志,略。。。
        }
        //-------------------------------------------------------------------------------------//
        //获取application、registries、monitor信息,如果没有就为null
        //略。。。
        //检查application合法性
        checkApplication();
        checkStubAndMock(interfaceClass);
        //-------------------------------------------------------------------------------------//
        //添加side、协议版本、时间戳、进程号等信息到map中
        Map<String, String> map = new HashMap<String, String>();
        Map<Object, Object> attributes = new HashMap<Object, Object>();
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        //非泛化服务(什么是泛化,最后解析)
        if (!isGeneric()) {
            //获取版本
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
            //获取方法列表,并添加到map中
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        //添加前面获取的各种信息到map中
        map.put(Constants.INTERFACE_KEY, interfaceName);
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        //-------------------------------------------------------------------------------------//
        //遍历MethodConfig,处理事件通知配置
        //略。。。
        //-------------------------------------------------------------------------------------//
        //获取消费者IP地址
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
        //存储属性值到系统上下文中,即前面获取的各种属性值
        StaticContext.getSystemContext().putAll(attributes);
        //创建代理对象
        ref = createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }
    

    ​ 初始化配置工作大概就这么多,其中省略了一些代码,可以通过IDEA调试过一遍。下面的方法才是关键。

    1.服务引用

    ​ 我们进入到createProxy方法中,其字面意思是创建代理对象,该方法还会调用其他方法构建以及合并Invoker实例。

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            //如果url不为空,不做本地引用
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
                //根据url的协议、scope、injvm等参数检查是否需要本地引用
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }
        //本地引用
        if (isJvmRefer) {
            //生成本地引用url,协议为injvm
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            //构建InjvmInvoker实例
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        //远程引用
        } else {
            //如果url不为空,说明可能是直连服务(即我们在调用时写明了Provider的地址)
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            //设置接口全限定名为url路径
                            url = url.setPath(interfaceName);
                        }
                        //检查url协议是否为registry,如果是,代表用户是想使用指定的注册中心,而非直连服务
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else {
                //加载注册中心url
                List<URL> us = loadRegistries(false);
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        //添加refer参数到url中,并把url添加到urls中
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                //如果没有注册中心,就抛出异常
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
            //如果只有一个注册中心,或者是服务直连
            if (urls.size() == 1) {
                //构建Invoker实例
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                //多个注册中心或者多个服务提供者
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                //获取所有的Invoker实例
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
        //略。。。
        //返回代理对象
        return (T) proxyFactory.getProxy(invoker);
    }
    

    ​ createProxy方法的主要作用就是先区分是本地引用还是远程引用,接着区分是直连还是注册中心、是多注册中心(服务)还是单注册中心。最终都会构建出Invoker实例,并返回生成的代理类。Invoker是Dubbo的核心,在消费者这边,Invoker用于执行远程调用,其是由Protocol实现类构建而来。Protocol实现类有很多,主要由协议区分,分析服务暴露时我们也看到过。关于消费者这边的Invoker创建,为了不影响文章的流畅性,放到最后再分析。

    2.创建代理

    ​ Invoker创建完毕后,接下来就是为服务接口生成代理对象。我们找到getProxy的实现方法,在AbstractProxyFactory抽象类中,这个类中的getProxy方法主要是获取接口列表,最后还会调用一个getProxy的抽象方法,这个抽象方法的实现方法在JavassistProxyFactory类中,如下:

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        //通过newInstance创建代理对象
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    

    ​ 虽然这个Proxy并非Java中的,而是Dubbo自己封装的,但是依旧是基于Java反射实现的。InvokerInvocationHandler则完全属于Java动态代理的内容了,主要是实现invoke方法,从而实现调用目标方法。可以在调用目标方法前后添加一些额外的方法,这也是AOP的原理。

    疑问解析

    • 消费者的Invoker是怎么创建的?

      • 从上面的源码分析中的createProxy方法可以发现,消费者的Invoker是由refer方法构建。不同的协议会进入不同的Protocol实现类中,调用对应的refer方法。debug源码可以发现,url的协议是registry,我们进入到对应的实现类。

        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            //设置协议头,此时url=zookeeper://192.168.174.128:2181/com.alibaba.dubbo.registry.RegistryService......
            url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
            //获取注册中心
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
        
            //将url上的参数转化为Map
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            //获取group配置
            String group = qs.get(Constants.GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                        || "*".equals(group)) {
                    return doRefer(getMergeableCluster(), registry, type, url);
                }
            }
            //继续执行服务引用逻辑
            return doRefer(cluster, registry, type, url);
        }
        

        接着进入doRefer方法:

        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            //创建RegistryDirectory实例
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            //设置注册中心和协议
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            //生成消费者URL
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            //注册消费者服务,存在consumer节点
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            //订阅providers、configuration、routers等节点数据
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
            //注册中心可能存在多个服务提供者,需要合并为一个Invoker
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
            return invoker;
        }
        

        最后,我们启动消费者,打开Zookeeper客户端看一下consumer节点下是否有消费者的信息:

        image
      • 需要注意一点,要让消费者保持运行,不然看不到。

      • 看到这里其实有点疑惑,没看到RegistryProtocol启动网络连接,为什么就和zk通上信了?其实在上面那个方法中,还调用了DubboProtocol的refer方法,启动Server就在DubboProtocol中。我们接着分析一下,进入DubboProtocol的refer方法:

      • public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
            //创建DubboInvoker
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
        
      • 这个方法比较简单,关键在于getClients。它会获取通信客户端实例,类型为ExchangeClient,但是ExchangeClient并不具备通信能力,其底层默认是Netty客户端。我们看一下getClients方法:

      • private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
        
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    //获取共享客户端
                    clients[i] = getSharedClient(url);
                } else {
                    //初始化新的客户端
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }
        
      • getSharedClient方法也会调用initClient方法,我们直接分析一下initClient方法:

      • private ExchangeClient initClient(URL url) {
            // 获取客户端类型,默认为Netty
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
            String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
            boolean compatible = (version != null && version.startsWith("1.0."));
            //添加编解码和心跳包参数到url中
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
        
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
                } else {
                    //创建普通的ExchangeClient实例
                    client = Exchangers.connect(url, requestHandler);
                }
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
            return client;
        }
        
      • 接下来就比较简单了,可以自己debug一层层跟进connect方法,就会看到NettyClient的构造方法。

    相关文章

      网友评论

          本文标题:Dubbo服务引用源码解析

          本文链接:https://www.haomeiwen.com/subject/euqlaktx.html