美文网首页
dubbo消费者bean初始化

dubbo消费者bean初始化

作者: 7d972d5e05e8 | 来源:发表于2019-10-30 20:20 被阅读0次

    先上dubbo初始化调用栈:


    dubbo初始化1.png

    一、消费者bean初始化1----spring入口

    这节主要介绍dubbo的消费者和spring结合时,初始化的过程。
    这里默认使用@Reference注解注入dubbo的bean。
    可以看到,Spring bean在初始化的时候,需要在populateBean方法进行注入属性,这里注入就是dubbo bean属性,这里都是由spring管理的。但是在factory.getObject()这里,factory就是dubbo的ReferenceBean,它重写了getObject方法。

    二、消费者bean初始化2----RegistryProtocol执行

    这里只介绍init()方法中的 ref = createProxy(map)方法。进去之后,核心方法为invoker = refprotocol.refer(interfaceClass, urls.get(0))。重点来了refprotocol是什么?如下:

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    

    发现refprotocol是dubbo SPI的一个自适应拓展类。该类是由Javassist生成的代理类。在调用refer方法的时候,会去调用ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(“key”),然后这个时候的key一定是registry,所以会走到RegistryProtocol.refer。但是dubbo SPI机制,会自动把wapper封装上去。如下:

     private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (wrapperClasses != null && wrapperClasses.size() > 0) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
            } catch (Throwable t) {
                throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                        type + ")  could not be instantiated: " + t.getMessage(), t);
            }
        }
    

    详细见dubbo的SPI详解,可知RegistryProtocol会被两个默认的wapper封装:ProtocolFilterWrapper和ProtocolListenerWrapper(多个wapper之间的顺序,目前好像没有设置,直接从Set取封装的)。这个是dubbo的spi,对protocol接口默认配置。所以下一步一定会调用到ProtocolListenerWrapper上,代码如下:

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                    Collections.unmodifiableList(
                            ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                    .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
        }
    

    该接口对registry特殊处理,不进行ListenerInvokerWrapper封装,直接调用下一层的protocol,下一层就是ProtocolFilterWrapper,代码如下:

    @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url); // 服务消费者注册并引用服务
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY,
                    Constants.CONSUMER); // 消费者引用服务的拦截器
        }
    

    该接口也对registry特殊处理,不进行buildInvokerChain封装,直接调用下一层的protocol,下一层就是真正的RegistryProtocol了,代码如下:

    // 引用远程服务
        @Override
        @SuppressWarnings("unchecked")
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY))
                    .removeParameter(Constants.REGISTRY_KEY);
            // 基于注册中心构建调用执行体的本地动态代理
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // refer=application%3Dxxx%26interface%3Dcom.xxx.XxxService%26methods%3Dxxx%26register.ip%3Dxxx%26remote.timestamp%3Dxxx
            // group="a,b" or group="*"
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            String group = qs.get(Constants.GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                        || "*".equals(group)) {
                    return doRefer(getMergeableCluster(), registry, type, url);
                }
            }
            return doRefer(cluster, registry, type, url);
        }
    

    到这里只是执行了RegistryProtocol的refer方法,并没有初始化真正的Invoker。看下最上面的一行代码,url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)),这里把url的protocol参数从registry替换成了dubbo。为下面的正在初始化做准备,下面会讲到。

    三、消费者bean初始化3----directory.subscribe()

    上面介绍了RegistryProtocol的执行,真正初始化Invoker的地方在,doRefer方法,如下:

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            // 注册目录
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // REFER_KEY的所有属性
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY),
                    0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                // 将服务消费者订阅配置注册到服务注册中心
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
    
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY
                            + "," + Constants.CELLS_CATEGORY));
    
            Invoker<T> invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    

    这里的核心方法,directory.subscribe就是订阅zk,并且初始化Invoker。一直跟踪到如下图所示:


    dubbo3.png

    这里可以看到toInvokers方法,其中初始化Invoker的代码如下:

    if (enabled) {
         invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
           }
    

    到这里我们终于又看到了protocol.refer方法了。这里的protocol还是SPI自适应拓展实现类,正在执行的protocol需要根据url上的参数决定,这时的protocol=dubbo。但是由于Wapper类会自动包装在DubboProtocol上,所以这里还是先执行了ProtocolListenerWrapper了,然后执行ProtocolFilterWrapper。我们再把他们的代码看一遍:

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                    Collections.unmodifiableList(
                            ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                    .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
        }
    
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url); // 服务消费者注册并引用服务
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY,
                    Constants.CONSUMER); // 消费者引用服务的拦截器
        }
    

    这个时候是dubbo协议了,不会被register协议拦截了。所以,会走到各自方法下面的逻辑。
    ProtocolListenerWrapper会用ListenerInvokerWrapper包装下面的Invoker。下面的Invoker会通过buildInvokerChain构造一个filterChain。看下buildInvokerChain源码:

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            // 拦截器列表
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class)
                    .getActivateExtension(invoker.getUrl(), key, group);
            if (filters.size() > 0) {
                for (int i = filters.size() - 1; i >= 0; i--) {
              //从最后一个filter开始封装,它会被封装到最底层。
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        @Override
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            // 进行调用拦截
                            return filter.invoke(next, invocation);
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
    

    通过SPI的getActivateExtension方法,获取dubbo默认激活的filter。然后一层一层filter封装。可以看到,封装的时候从列表的最后一个filter开始封装,封装到最底层,最接近DubboInvoker。DubboInvoker是真正的一个执行调用的地方,它里面应该包含了一个网络client,用来把dubbo调用请求,通过网络的方式发送到提供者那里。

    四、消费者bean初始化4----DubboInvoker的初始化

    DubboInvoker的初始化,主要是用来初始化ExchangeClient的。代码如下:

       DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    
    
        public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
            super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
            this.clients = clients;
            // get version.
            this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
            this.invokers = invokers;
        }
    

    核心方法在getClients方法上,如下:

    private ExchangeClient[] getClients(URL url) {
            //是否共享连接
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            //如果connections不配置,则共享连接,否则每服务每连接
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
    
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }
    

    核心方法是initClient(url),这里就不向下展开了。总的来说,下面的代码就是通过url上的参数值,根据SPI获取相应的Exchanger,Transporter,然后初始化NettyClient。初始化好后,基本上tcp连接就建立了。到这里一个真正的Invoker,它包含了各种封装的Invoker。
    下面简单看下一个dubbo消费者bean的调用链路:


    image.png

    可以看到从getByCouponCode的业务方法,到调用到真正Invoker的DubboInvoker,中间经过了很多封装类型的Invoker操作。

    所以说,Invoker是dubbo核心领域模型。一切都是围绕着Invoker来的,这也符合dubbo的设计原则:微核心+插件式。

    1. 微核心:Invoker,Exporter,Invocation,这些都是dubbo抽象出来的核心领域模型,一切高级特性都是围绕着它们来。比如:负载均衡,过滤器等特性
    2. 插件式:通过SPI方法,提供插件式的拓展性。比如:protocol,serialize等协议,都是可替换的。

    肯定有错误的地方,欢迎指正和讨论

    相关文章

      网友评论

          本文标题:dubbo消费者bean初始化

          本文链接:https://www.haomeiwen.com/subject/trravctx.html