美文网首页dubbo
Dubbo消费者初始化流程源码解析

Dubbo消费者初始化流程源码解析

作者: 海涛_meteor | 来源:发表于2018-08-04 23:36 被阅读0次

    前言

    消费者初始化也就是对<dubbo:reference>中的内容进行解析和初始化,根据Dubbo的官方文档描述,其对应的配置类为com.alibaba.dubbo.config.ReferenceConfig,但它的入口在哪里呢?

    由于Dubbo和Spring是高度整合的,因此Spring中实现InitializingBean的方式是一个很好的初始化方式,找了一下确实找到了一个类ReferenceBean,让我们来看一下它的定义

    public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean 
    

    我们看到了InitializingBean加上ReferenceConfig,几乎可以肯定这就是我们要找的入口,那么开始吧

    源码分析

    说到InitializingBean第一反应肯定是找afterPropertiesSet()方法,来看一下

       @SuppressWarnings({ "unchecked"})
        public void afterPropertiesSet() throws Exception {
            if (getConsumer() == null) {
                Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null  : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
                if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
                    ConsumerConfig consumerConfig = null;
                    for (ConsumerConfig config : consumerConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (consumerConfig != null) {
                                throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
                            }
                            consumerConfig = config;
                        }
                    }
                    if (consumerConfig != null) {
                        setConsumer(consumerConfig);
                    }
                }
            }
            if (getApplication() == null
                    && (getConsumer() == null || getConsumer().getApplication() == null)) {
                Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
                if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
                    ApplicationConfig applicationConfig = null;
                    for (ApplicationConfig config : applicationConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (applicationConfig != null) {
                                throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
                            }
                            applicationConfig = config;
                        }
                    }
                    if (applicationConfig != null) {
                        setApplication(applicationConfig);
                    }
                }
            }
            if (getModule() == null
                    && (getConsumer() == null || getConsumer().getModule() == null)) {
                Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
                if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
                    ModuleConfig moduleConfig = null;
                    for (ModuleConfig config : moduleConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (moduleConfig != null) {
                                throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
                            }
                            moduleConfig = config;
                        }
                    }
                    if (moduleConfig != null) {
                        setModule(moduleConfig);
                    }
                }
            }
            if ((getRegistries() == null || getRegistries().size() == 0)
                    && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().size() == 0)
                    && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
                Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
                if (registryConfigMap != null && registryConfigMap.size() > 0) {
                    List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
                    for (RegistryConfig config : registryConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            registryConfigs.add(config);
                        }
                    }
                    if (registryConfigs != null && registryConfigs.size() > 0) {
                        super.setRegistries(registryConfigs);
                    }
                }
            }
            if (getMonitor() == null
                    && (getConsumer() == null || getConsumer().getMonitor() == null)
                    && (getApplication() == null || getApplication().getMonitor() == null)) {
                Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
                if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
                    MonitorConfig monitorConfig = null;
                    for (MonitorConfig config : monitorConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (monitorConfig != null) {
                                throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
                            }
                            monitorConfig = config;
                        }
                    }
                    if (monitorConfig != null) {
                        setMonitor(monitorConfig);
                    }
                }
            }
            Boolean b = isInit();
            if (b == null && getConsumer() != null) {
                b = getConsumer().isInit();
            }
            if (b != null && b.booleanValue()) {
                getObject();
            }
        }
    
    

    前面一大片的代码都是在做赋值,分别调用了setConsumersetApplicationsetModulesetRegistriessetMonitor这几个方法,全部都是AbstractInterfaceConfig类中的set方法,最核心的还是最后的getObject()方法

        public Object getObject() throws Exception {
            return get();
        }
    

    这里终于调用到了父类ReferenceConfigget()方法,继续来看

    public synchronized T get() {
            if (destroyed){
                throw new IllegalStateException("Already destroyed!");
            }
            if (ref == null) {
                init();
            }
            return ref;
        }
    

    这里的ref定义如下

    // 接口代理类引用
        private transient volatile T ref;
    

    具体来看看init()方法

        private void init() {
            
            // 获取消费者全局配置
            checkDefault();
            appendProperties(this);
            if (getGeneric() == null && getConsumer() != null) {
                setGeneric(getConsumer().getGeneric());
            }
            if (ProtocolUtils.isGeneric(getGeneric())) {
                interfaceClass = GenericService.class;
            } else {
                try {
                    interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                            .getContextClassLoader());
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                checkInterfaceAndMethods(interfaceClass, methods);
            }
            String resolve = System.getProperty(interfaceName);
            String resolveFile = null;
            if (resolve == null || resolve.length() == 0) {
                resolveFile = System.getProperty("dubbo.resolve.file");
                if (resolveFile == null || resolveFile.length() == 0) {
                    File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
                    if (userResolveFile.exists()) {
                        resolveFile = userResolveFile.getAbsolutePath();
                    }
                }
                if (resolveFile != null && resolveFile.length() > 0) {
                    Properties properties = new Properties();
                    FileInputStream fis = null;
                    try {
                        fis = new FileInputStream(new File(resolveFile));
                        properties.load(fis);
                    } catch (IOException e) {
                        throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
                    } finally {
                        try {
                            if(null != fis) fis.close();
                        } catch (IOException e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                    resolve = properties.getProperty(interfaceName);
                }
            }
            /**
             *   略去部分代码
             */
             
            checkApplication();
            checkStubAndMock(interfaceClass);
            Map<String, String> map = new HashMap<String, String>();
            Map<Object, Object> attributes = new HashMap<Object, Object>();
            map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
            map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
            map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
            if (ConfigUtils.getPid() > 0) {
                map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
            }
            if (! isGeneric()) {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put("revision", revision);
                }
    
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if(methods.length == 0) {
                    logger.warn("NO method found in service interface " + interfaceClass.getName());
                    map.put("methods", Constants.ANY_VALUE);
                }
                else {
                    map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
            map.put(Constants.INTERFACE_KEY, interfaceName);
            appendParameters(map, application);
            appendParameters(map, module);
            appendParameters(map, consumer, Constants.DEFAULT_KEY);
            appendParameters(map, this);
            String prifix = StringUtils.getServiceKey(map);
            if (methods != null && methods.size() > 0) {
                for (MethodConfig method : methods) {
                    appendParameters(map, method, method.getName());
                    String retryKey = method.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(method.getName() + ".retries", "0");
                        }
                    }
                    appendAttributes(attributes, method, prifix + "." + method.getName());
                    checkAndConvertImplicitConfig(method, map, attributes);
                }
            }
            //attributes通过系统context进行存储.
            StaticContext.getSystemContext().putAll(attributes);
            ref = createProxy(map);
        }
    

    首先看一下checkDefault()这个方法

        private void checkDefault() {
            if (consumer == null) {
                consumer = new ConsumerConfig();
            }
            appendProperties(consumer);
        }
    

    这里生成了一个ConsumerConfig对象,根据官方文档说明这个类对应了<dubbo:consumer>这个标签中的配置项,顺带提一句该标签是<dubbo:reference>标签的缺省值,因此会覆盖<dubbo:reference>中的设置。
    接着看一下appendProperties(AbstractConfig config)方法做了些什么

        protected static void appendProperties(AbstractConfig config) {
            if (config == null) {
                return;
            }
            String prefix = "dubbo." + getTagName(config.getClass()) + ".";
            Method[] methods = config.getClass().getMethods();
            for (Method method : methods) {
                    String name = method.getName();
                    if (name.length() > 3 && name.startsWith("set") && Modifier.isPublic(method.getModifiers()) 
                            && method.getParameterTypes().length == 1 && isPrimitive(method.getParameterTypes()[0])) {
                        String property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), "-");
    
                        String value = null;
                        if (config.getId() != null && config.getId().length() > 0) {
                            String pn = prefix + config.getId() + "." + property;
                            value = System.getProperty(pn);
                            if(! StringUtils.isBlank(value)) {
                                logger.info("Use System Property " + pn + " to config dubbo");
                            }
                        }
                        if (value == null || value.length() == 0) {
                            String pn = prefix + property;
                            value = System.getProperty(pn);
                            if(! StringUtils.isBlank(value)) {
                                logger.info("Use System Property " + pn + " to config dubbo");
                            }
                        }
                        if (value == null || value.length() == 0) {
                            Method getter;
                            try {
                                getter = config.getClass().getMethod("get" + name.substring(3), new Class<?>[0]);
                            } catch (NoSuchMethodException e) {
                                try {
                                    getter = config.getClass().getMethod("is" + name.substring(3), new Class<?>[0]);
                                } catch (NoSuchMethodException e2) {
                                    getter = null;
                                }
                            }
                            if (getter != null) {
                                if (getter.invoke(config, new Object[0]) == null) {
                                    if (config.getId() != null && config.getId().length() > 0) {
                                        value = ConfigUtils.getProperty(prefix + config.getId() + "." + property);
                                    }
                                    if (value == null || value.length() == 0) {
                                        value = ConfigUtils.getProperty(prefix + property);
                                    }
                                    if (value == null || value.length() == 0) {
                                        String legacyKey = legacyProperties.get(prefix + property);
                                        if (legacyKey != null && legacyKey.length() > 0) {
                                            value = convertLegacyValue(legacyKey, ConfigUtils.getProperty(legacyKey));
                                        }
                                    }
                                    
                                }
                            }
                        }
                        if (value != null && value.length() > 0) {
                            method.invoke(config, new Object[] {convertPrimitive(method.getParameterTypes()[0], value)});
                        }
                    }
    
            }
        }
    

    看着比较长,其实主要做的事情就是执行入参config中的set方法来初始化传入的对象,这里就不深入分析了。

    继续往下看其中最重要的操作ref = createProxy(map),用于生成代理对象,具体来看一下其实现

        @SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
        private T createProxy(Map<String, String> map) {
    
            /**
             *   略去部分代码
             */
             
                if (urls.size() == 1) {
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // 用了最后一个registry url
                        }
                    }
                    if (registryURL != null) { // 有 注册中心协议的URL
                        // 对有注册中心的Cluster 只用 AvailableCluster
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    }  else { // 不是 注册中心的URL
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            }
            if (c == null) {
                c = true; // default true
            }
            if (c && ! invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            // 创建服务代理
            return (T) proxyFactory.getProxy(invoker);
        }
    

    这个方法里面最值得关注的地方有三个,分别是

    1. refprotocol.refer(interfaceClass, url)
    2. cluster.join(new StaticDirectory(u, invokers))
    3. proxyFactory.getProxy(invoker)

    这三个对象很重要,来看看其定义

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
        
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    

    这三个对象的生成使用的都是Dubbo的SPI机制,具体可以参考我之前发的讲解Dubbo中SPI的那篇文章,以proxyFactory为例,它的本质是ProxyFactory的一个代理类proxyFactory$Adpative@Adaptive标注方法的具体实现由真正的实现类完成。

    @SPI("javassist")
    public interface ProxyFactory {
    
        @Adaptive({Constants.PROXY_KEY})
        <T> T getProxy(Invoker<T> invoker) throws RpcException;
    
        @Adaptive({Constants.PROXY_KEY})
        <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
    
    }
    

    stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
    jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
    javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory
    来看看JavassistProxyFactory类的源码

    META-INF/dubbo/internal/com.alibaba.dubbo.rpc.ProxyFactory中的内容如上所示,可以看到javassistProxyFactory接口的默认选择,对应的实现类就是JavassistProxyFactorydubboProtocol接口的默认选项,对应实现类是DubboProtocolfailoverCluster接口的默认选项,对应实现类是FailoverCluster

    了解了三个对象的实现后,首先看refprotocol.refer(interfaceClass, url方法,其具体实现由url中对象的protocol属性决定

        for (URL url : urls) {
            invokers.add(refprotocol.refer(interfaceClass, url));
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                registryURL = url; // 用了最后一个registry url
            }
        }
    

    有上述代码看出refprotocol的实现由url决定,此处url又由loadRegistries(false)方法获得,其protocol属性为registryloadRegistries(false)方法获取了所有<dubbo:registry>注册的信息,根据其返回的类型List<URL>,我们也可以发现Dubbo是可以配置多个注册中心的。

    registry对应实现类为RegistryProtocol,根据上一篇关于SPI文章的讲解,此处的refprotocol是一个代理类,其refer方法的调用链为

    Protocol$Adpative -> ProtocolListenerWrapper -> ProtocolFilterWrapper -> RegistryProtocol

    来看一下RegistryProtocolrefer方法

    @SuppressWarnings("unchecked")
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // group="a,b" or group="*"
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            String group = qs.get(Constants.GROUP_KEY);
            if (group != null && group.length() > 0 ) {
                if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                        || "*".equals( group ) ) {
                    return doRefer( getMergeableCluster(), registry, type, url );
                }
            }
            return doRefer(cluster, registry, type, url);
        }
    

    首先registryFactory.getRegistry(url)用于注册注册中心,这里的getRegistry方法属于AbstractRegistryFactory类,来看一下具体实现

        public Registry getRegistry(URL url) {
            url = url.setPath(RegistryService.class.getName())
                    .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                    .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
            String key = url.toServiceString();
            // 锁定注册中心获取过程,保证注册中心单一实例
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);
                if (registry != null) {
                    return registry;
                }
                registry = createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry);
                return registry;
            } finally {
                // 释放锁
                LOCK.unlock();
            }
        }
    

    这里通过加锁的方式往REGISTRIES这个map里添加createRegistry(url)方法生成的registry对象,createRegistry(url)方法的具体实现同样由SPI方式生成的代理对象执行,我们直接看zookeeper情况下在ZookeeperRegistryFactory这个类中的实现

        private ZookeeperTransporter zookeeperTransporter
    
        public Registry createRegistry(URL url) {
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    

    这里的zookeeperTransporter是一个ZookeeperTransporter类型的对象,该对象同样是由SPI机制管理的,默认实现方式是zkclient的,对应实现类是ZkclientZookeeperTransporter

    接着看一下new ZookeeperRegistry(url, zookeeperTransporter)在做什么

        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            super(url);
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
            if (! group.startsWith(Constants.PATH_SEPARATOR)) {
                group = Constants.PATH_SEPARATOR + group;
            }
            this.root = group;
            zkClient = zookeeperTransporter.connect(url);//zk上创建节点
            zkClient.addStateListener(new StateListener() {//监听该节点
                public void stateChanged(int state) {
                    if (state == RECONNECTED) {
                        try {
                            recover();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            });
        }
    

    这里ZookeeperRegistry的构造方法调用了父类FailbackRegistry的构造方法,然后做了两件事,1.在zookeeper上创建一个ZkClient节点,2.监听该节点的RECONNECTED事,下面来看一看FailbackRegistry的构造方法

        public FailbackRegistry(URL url) {
            super(url);
            int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
            this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
                public void run() {
                    // 检测并连接注册中心
                    try {
                        retry();
                    } catch (Throwable t) { // 防御性容错
                        logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                    }
                }
            }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
        }
    

    这里又调用了父类AbstractRegistry的构造方法,并启用了一个线程池来定时重建与注册中心的连接,从这个类的名字FailbackRegistry就可以看出来它的作用。接着来看AbstractRegistry的构造方法

        public AbstractRegistry(URL url) {
            setUrl(url);
            // 启动文件保存定时器
            syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
            String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
            File file = null;
            if (ConfigUtils.isNotEmpty(filename)) {
                file = new File(filename);
                if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
                    if(! file.getParentFile().mkdirs()){
                        throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                    }
                }
            }
            this.file = file;
            loadProperties();
            notify(url.getBackupUrls());
        }
    

    这里首先从System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"这个用户本地目录加载缓存的文件到properties中,接着调用了notify(List<URL> urls)方法

        protected void notify(List<URL> urls) {
            if(urls == null || urls.isEmpty()) return;
            
            for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
                URL url = entry.getKey();
                
                if(! UrlUtils.isMatch(url, urls.get(0))) {
                    continue;
                }
                
                Set<NotifyListener> listeners = entry.getValue();
                if (listeners != null) {
                    for (NotifyListener listener : listeners) {
                        try {
                            notify(url, listener, filterEmpty(url, urls));
                        } catch (Throwable t) {
                            logger.error("Failed to notify registry event, urls: " +  urls + ", cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
        }
    

    这里用到了观察者模式,当服务变更时,通过getSubscribed()方法获取订阅者列表,并通知所有的订阅者更新。

    回到RegistryProtocol类的refer(Class<T> type, URL url)方法,最后返回的是doRefer(cluster, registry, type, url),来看一下里面做了什么

        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
            if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                    Constants.PROVIDERS_CATEGORY 
                    + "," + Constants.CONFIGURATORS_CATEGORY 
                    + "," + Constants.ROUTERS_CATEGORY));
            return cluster.join(directory);
        }
    

    这里首先调用了register方法在注册中心进行了订阅,并在注册中心的写入了消费者的信息。这里调用层级较深,就不再深入展开。

    接着看一下subscribe方法,

        public void subscribe(URL url) {
            setConsumerUrl(url);
            registry.subscribe(url, this);
        }
    

    调用了ZookeeperRegistrysubscribe方法,也就是FailbackRegistrysubscribe方法

        @Override
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // 向服务器端发送订阅请求
                doSubscribe(url, listener);
            } catch (Exception e) {
                Throwable t = e;
    
                List<URL> urls = getCacheUrls(url);
                if (urls != null && urls.size() > 0) {
                    notify(url, listener, urls);
                    logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
                } else {
                    // 如果开启了启动时检测,则直接抛出异常
                    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                            && url.getParameter(Constants.CHECK_KEY, true);
                    boolean skipFailback = t instanceof SkipFailbackWrapperException;
                    if (check || skipFailback) {
                        if(skipFailback) {
                            t = t.getCause();
                        }
                        throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                    } else {
                        logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                    }
                }
    
                // 将失败的订阅请求记录到失败列表,定时重试
                addFailedSubscribed(url, listener);
            }
        }
    

    可以看到首先会调用父类AbstractRegistrysubscribe方法添加监听器,然后通过doSubscribe(url, listener)向服务端发送订阅请求,该方法在ZookeeperRegistry类中实现,来重点看一下这个方法

        protected void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                for (String child : currentChilds) {
                                    child = URL.decode(child);
                                    if (! anyServices.contains(child)) {
                                        anyServices.add(child);
                                        subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, 
                                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                                    }
                                }
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(root, false);
                    List<String> services = zkClient.addChildListener(root, zkListener);
                    if (services != null && services.size() > 0) {
                        for (String service : services) {
                            service = URL.decode(service);
                            anyServices.add(service);
                            subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, 
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                } else {
                    List<URL> urls = new ArrayList<URL>();
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            listeners.putIfAbsent(listener, new ChildListener() {
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                }
                            });
                            zkListener = listeners.get(listener);
                        }
                    //创建下述节点
                    // /dubbo/dubbo.common.hello.service.HelloService/providers/
                    // /dubbo/dubbo.common.hello.service.HelloService/configurators/
                    // /dubbo/dubbo.common.hello.service.HelloService/routers/
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    方法很长,主要做的事情是通过zkClient.create(path, false)在zookeeper中创建监听的节点,然后调用notify(url, listener, urls)方法,其具体实现在FailbackRegistry

        @Override
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            try {
                doNotify(url, listener, urls);
            } catch (Exception t) {
                // 将失败的通知请求记录到失败列表,定时重试
                Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
                if (listeners == null) {
                    failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
                    listeners = failedNotified.get(url);
                }
                listeners.put(listener, urls);
                logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }
    

    跟进doNotify方法

        protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
            super.notify(url, listener, urls);
        }
    

    调用了父类AbstractRegistrynotify方法

        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((urls == null || urls.size() == 0) 
                    && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            Map<String, List<URL>> result = new HashMap<String, List<URL>>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                    List<URL> categoryList = result.get(category);
                    if (categoryList == null) {
                        categoryList = new ArrayList<URL>();
                        result.put(category, categoryList);
                    }
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified == null) {
                notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
                categoryNotified = notified.get(url);
            }
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                saveProperties(url);
                listener.notify(categoryList);
            }
        }
    

    最后真正的实现在listener.notify(categoryList)中,跟到RegistryDirectory的notify(List<URL> urls)方法中

        public synchronized void notify(List<URL> urls) {
            List<URL> invokerUrls = new ArrayList<URL>();
            List<URL> routerUrls = new ArrayList<URL>();
            List<URL> configuratorUrls = new ArrayList<URL>();
            for (URL url : urls) {
                String protocol = url.getProtocol();
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.ROUTERS_CATEGORY.equals(category) 
                        || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                    routerUrls.add(url);
                } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                        || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                    configuratorUrls.add(url);
                } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                    invokerUrls.add(url);
                } else {
                    logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
                }
            }
            // configurators 
            if (configuratorUrls != null && configuratorUrls.size() >0 ){
                this.configurators = toConfigurators(configuratorUrls);
            }
            // routers
            if (routerUrls != null && routerUrls.size() >0 ){
                List<Router> routers = toRouters(routerUrls);
                if(routers != null){ // null - do nothing
                    setRouters(routers);
                }
            }
            List<Configurator> localConfigurators = this.configurators; // local reference
            // 合并override参数
            this.overrideDirectoryUrl = directoryUrl;
            if (localConfigurators != null && localConfigurators.size() > 0) {
                for (Configurator configurator : localConfigurators) {
                    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
                }
            }
            // providers
            refreshInvoker(invokerUrls);
        }
    

    这里做的事情主要是更新configuratorsrouters,然后调用refreshInvoker(invokerUrls),这个方法用于更新服务提供者相关的invoker,因此非常重要

        private void refreshInvoker(List<URL> invokerUrls){
            if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                    && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // 禁止访问
                this.methodInvokerMap = null; // 置空列表
                destroyAllInvokers(); // 关闭所有Invoker
            } else {
                this.forbidden = false; // 允许访问
                Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    this.cachedInvokerUrls = new HashSet<URL>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
                }
                if (invokerUrls.size() ==0 ){
                    return;
                }
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
                Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
                // state change
                //如果计算错误,则不进行处理.
                if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
                    return ;
                }
                this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
                this.urlInvokerMap = newUrlInvokerMap;
                try{
                    destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
                }catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }
    

    这里的核心是toInvokers(invokerUrls)方法将URL列表转成Invoker列表

        private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
            Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
            if(urls == null || urls.size() == 0){
                return newUrlInvokerMap;
            }
            Set<String> keys = new HashSet<String>();
            String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
            for (URL providerUrl : urls) {
                //如果reference端配置了protocol,则只选择匹配的protocol
                if (queryProtocols != null && queryProtocols.length() >0) {
                    boolean accept = false;
                    String[] acceptProtocols = queryProtocols.split(",");
                    for (String acceptProtocol : acceptProtocols) {
                        if (providerUrl.getProtocol().equals(acceptProtocol)) {
                            accept = true;
                            break;
                        }
                    }
                    if (!accept) {
                        continue;
                    }
                }
                if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                    continue;
                }
                if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                    logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() 
                            + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                    continue;
                }
                URL url = mergeUrl(providerUrl);
                
                String key = url.toFullString(); // URL参数是排序的
                if (keys.contains(key)) { // 重复URL
                    continue;
                }
                keys.add(key);
                // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
                Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
                if (invoker == null) { // 缓存中没有,重新refer
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(Constants.DISABLED_KEY)) {
                            enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
                        } else {
                            enabled = url.getParameter(Constants.ENABLED_KEY, true);
                        }
                        if (enabled) {
                            invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // 将新的引用放入缓存
                        newUrlInvokerMap.put(key, invoker);
                    }
                }else {
                    newUrlInvokerMap.put(key, invoker);
                }
            }
            keys.clear();
            return newUrlInvokerMap;
        }
    

    这里返回的是Map<String, Invoker<T>>的map对象,由invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl)方法创建

    这里的protocol同样由SPI机制创建,最终该refer方法会由DubboProtocol实现,来看一下

        public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    
            // modified by lishen
            optimizeSerialization(url);
    
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
    

    可以看到创建的是一个DubboInvoker的对象,这个对象中是存有和服务提供方的连接的,由getClients(url)方法得到

        private ExchangeClient[] getClients(URL url){
            //是否共享连接
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            //如果connections不配置,则共享连接,否则每服务每连接
            if (connections == 0){
                service_share_connect = true;
                connections = 1;
            }
            
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect){
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }
    

    返回值是ExchangeClient[]类型的,初始化由initClient(url)方法创建,默认的底层连接实现是通过Netty的,为了控制篇幅这里就不深入具体连接的细节了,有机会单开一篇讲一下。

    继续跳转回ReferenceConfigcreateProxy方法,到此refprotocol.refer(interfaceClass, urls.get(0))就解析完了接着看后续的代码

        if (registryURL != null) { // 有 注册中心协议的URL
            // 对有注册中心的Cluster 只用 AvailableCluster
            URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
            invoker = cluster.join(new StaticDirectory(u, invokers));
        }  else { // 不是 注册中心的URL
            invoker = cluster.join(new StaticDirectory(invokers));
        }
    

    这里的cluster对象之前也提过了,同样是SPI机制管理的,从其选项failoverfailfastfailback等基本就可以推测出来这个对象和接口的负载均衡机制有关,以默认的FailoverCluster类为例,来看一下join方法

        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<T>(directory);
        }
    

    没有做什么特别得事情,就是根据invokers合并生成一个FailoverClusterInvoker类型的invoker对象给消费者。

    最后来看proxyFactory.getProxy(invoker)方法,前面已经提过了proxyFactory对象的具体实现在JavassistProxyFactory中,来看一下这个类

    public class JavassistProxyFactory extends AbstractProxyFactory {
    
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
    
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper类不能正确处理带$的类名
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName, 
                                          Class<?>[] parameterTypes, 
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    
    }
    

    在这里我们并没有发现getProxy(Invoker<T> invoker)的方法,那么往父类去找,果然在AbstractProxyFactory类中找到了,我们来看一下

    public abstract class AbstractProxyFactory implements ProxyFactory {
    
        public <T> T getProxy(Invoker<T> invoker) throws RpcException {
            Class<?>[] interfaces = null;
            String config = invoker.getUrl().getParameter("interfaces");
            if (config != null && config.length() > 0) {
                String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
                if (types != null && types.length > 0) {
                    interfaces = new Class<?>[types.length + 2];
                    interfaces[0] = invoker.getInterface();
                    interfaces[1] = EchoService.class;
                    for (int i = 0; i < types.length; i ++) {
                        interfaces[i + 1] = ReflectUtils.forName(types[i]);
                    }
                }
            }
            if (interfaces == null) {
                interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
            }
            return getProxy(invoker, interfaces);
        }
        
        public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
    
    }
    

    从这里可以看到单参的getProxy方法中调用了双参的getProxy方法,该方法的具体实现又要根据子类来定,这又是一个经典的模板模式实现。

    回过头看JavassistProxyFactory中的getProxy方法,这里涉及到了Javassist字节码技术,所以光看这个类的getProxy方法可能会有些看不懂,那么我们来看看另一个同样继承了AbstractProxyFactory类的工厂类JdkProxyFactory

    public class JdkProxyFactory extends AbstractProxyFactory {
    
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
        }
    
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName, 
                                          Class<?>[] parameterTypes, 
                                          Object[] arguments) throws Throwable {
                    Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                    return method.invoke(proxy, arguments);
                }
            };
        }
    
    }
    

    看到这个类的getProxy方法就很熟悉了,这就是标准的jdk的动态代理的实现,也就是说该方法返回的对象被InvokerInvocationHandler类代理了。由此我们推断,JavassistProxyFactory中的getProxy也是起到了动态代理的效果。也就是说我们对invoker的调用都会被代理类InvokerInvocationHandler所代理。

    总结

    消费者初始化的流程非常长,但其实概括一下一共是做了三件事:

    • 监听注册中心
    • 连接服务提供者端进行服务引用
    • 创建服务代理并返回

    当然因为篇幅缘故很多细节没有写到位,不过大致的脉络应该是都没有落下了,这次的写作让我又一次对框架的开发者肃然起敬。

    相关文章

      网友评论

        本文标题:Dubbo消费者初始化流程源码解析

        本文链接:https://www.haomeiwen.com/subject/ecevvftx.html