美文网首页dubbo
Dubbo Provider export过程分析

Dubbo Provider export过程分析

作者: 晴天哥_王志 | 来源:发表于2019-10-17 09:42 被阅读0次

    开篇

     这篇文章尝试对Dubbo服务发布和调用中关于ServiceImpl->invoker->Exporter层面的过程进行分析,希望能够回答ServiceImpl到Exporter的转化过程。

     因为Netty转发部分的逻辑也是一个比较复杂的过程,所以拆解成几篇文章分开讲解,这里我们只关注服务发布过程中对象的转换以及部分调用的过程。

     整个需要分析的核心过程如下图所示,核心在于发布和调用两个过程。

    服务发布调用分解图
    服务发布

    service 到 invoker的过程

    public class ServiceConfig<T> extends AbstractServiceConfig {
    
        private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            
            // 省略非核心代码
    
            String scope = url.getParameter(SCOPE_KEY);
    
            if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    
                if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {
    
                            // 省略非核心代码
    
                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, 
                                                 registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);
                        }
                    } 
                }
            }
            this.urls.add(url);
        }
    }
    
    • 核心invoker的生成逻辑,Invoker<?> invoker = PROXY_FACTORY.getInvoker();
    • 核心exporter的生成逻辑,Exporter<?> exporter = protocol.export(wrapperInvoker);
    • 我们关心的就是invoker的生成逻辑和exporter的生成逻辑。
    private static final ProxyFactory PROXY_FACTORY = 
                          ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    
    
    
    public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
    
        public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
            java.lang.Class arg1, org.apache.dubbo.common.URL arg2)
            throws org.apache.dubbo.rpc.RpcException {
            if (arg2 == null) {
                throw new IllegalArgumentException("url == null");
            }
    
            org.apache.dubbo.common.URL url = arg2;
            String extName = url.getParameter("proxy", "javassist");
    
            if (extName == null) {
                throw new IllegalStateException(
                    "Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" +
                    url.toString() + ") use keys([proxy])");
            }
            // 返回javassist对应的StubProxyFactoryWrapper
            // StubProxyFactoryWrapper内部包装了JavassistProxyFactory对象。
            org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) 
                          ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class)
                         .getExtension(extName);
            // 执行 StubProxyFactoryWrapper的getInvoker()方法
            return extension.getInvoker(arg0, arg1, arg2);
        }
    }
    
    • PROXY_FACTORY实际指代的是ProxyFactory$Adaptive对象。
    • PROXY_FACTORY.getInvoker()调用的是ProxyFactory$Adaptive的getInvoker()方法。
    • ProxyFactory$Adaptive的getInvoker()执行ExtensionLoader.getExtensionLoader().getExtension("javassist")获取动态扩展。
    • javassist的扩展是StubProxyFactoryWrapper,内部包装了JavassistProxyFactory对象。
    • extension.getInvoker()执行StubProxyFactoryWrapper的getInvoker()方法。
    public class StubProxyFactoryWrapper implements ProxyFactory {
    
        // 实际是JavassistProxyFactory对象。
        private final ProxyFactory proxyFactory;
        private Protocol protocol;
    
        // 参数ProxyFactory是JavassistProxyFactory对象.
        public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
            this.proxyFactory = proxyFactory;
        }
    
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
            // 执行的是JavassistProxyFactory的getInvoker
            return proxyFactory.getInvoker(proxy, type, url);
        }
    
        private <T> Exporter<T> export(T instance, Class<T> type, URL url) {
            return protocol.export(proxyFactory.getInvoker(instance, type, url));
        }
    
    }
    
    • StubProxyFactoryWrapper包装JavassistProxyFactory对象。
    • 实际执行的是JavassistProxyFactory的getInvoker()方法。
    • 继续关注JavassistProxyFactory的getInvoker()方法。
    public class JavassistProxyFactory extends AbstractProxyFactory {
    
        @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
    
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    
    }
    
    • JavassistProxyFactory的getInvoker()核心步骤包括创建Wrapper对象并返回AbstractProxyInvoker的Invoker对象。
    • AbstractProxyInvoker对warpper进行一层包装,doInvoke内部调用的wrapper.invokeMethod()方法。
    • Wrapper对象是动态生成的代码,继续关注Wrapper对象的内部代码。
    package org.apache.dubbo.common.bytecode;
    
    import java.lang.reflect.InvocationTargetException;
    import java.util.Map;
    import org.apache.dubbo.common.bytecode.ClassGenerator;
    import org.apache.dubbo.common.bytecode.NoSuchMethodException;
    import org.apache.dubbo.common.bytecode.NoSuchPropertyException;
    import org.apache.dubbo.common.bytecode.Wrapper;
    import org.apache.dubbo.demo.provider.DemoServiceImpl;
    
    public class Wrapper1 extends Wrapper implements ClassGenerator.DC {
        
        // 核心的代码逻辑在于执行真正的ServiceImpl的调用
        public Object invokeMethod(Object object, String string, 
                    Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
            DemoServiceImpl demoServiceImpl;
            try {
                demoServiceImpl = (DemoServiceImpl)object;
            }
            catch (Throwable throwable) {
                throw new IllegalArgumentException(throwable);
            }
            try {
                // 调用ServiceImpl的真正方法
                if ("sayHello".equals(string) && arrclass.length == 1) {
                    return demoServiceImpl.sayHello((String)arrobject[0]);
                }
            }
            catch (Throwable throwable) {
                throw new InvocationTargetException(throwable);
            }
            throw new NoSuchMethodException(new StringBuffer()
            .append("Not found method \"").append(string)
            .append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
        }
    }
    
    • 完整的Wrapper代码在Dubbo之ProxyFactory解析可以查看,这里仅展示核心的invokeMethod方法。
    • invokeMethod()方法内部执调用ServiceImpl的方法,例子中sayHello()。
        public T getExtension(String name) {
            if (StringUtils.isEmpty(name)) {
                throw new IllegalArgumentException("Extension name == null");
            }
            if ("true".equals(name)) {
                return getDefaultExtension();
            }
            final Holder<Object> holder = getOrCreateHolder(name);
            Object instance = holder.get();
            if (instance == null) {
                synchronized (holder) {
                    instance = holder.get();
                    if (instance == null) {
                        instance = createExtension(name);
                        holder.set(instance);
                    }
                }
            }
            return (T) instance;
        }
    
    
    
    
       private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                    // 核心在于如果有wrapperClass 就包装一层
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
            } catch (Throwable t) {
                throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                        type + ") couldn't be instantiated: " + t.getMessage(), t);
            }
        }
    
    • 这部分尝试描述清楚ProxyFactory$Adaptive内部getExtension()返回StubProxyFactoryWrapper对象逻辑。
    • getExtension() => createExtension() 内部会判断是否有wrapperClass并针对JavassistProxyFactory进行包装。
    • 对于JavassistProxyFactory而言,StubProxyFactoryWrapper就是包装类。
    • getExtension()返回StubProxyFactoryWrapper对象,StubProxyFactoryWrapper包装JavassistProxyFactory对象。

    invoker生成总结

    • 通过ProxyFactory实现了ServiceImpl -> Wrapper -> AbstractProxyInvoker的整个过程。
    • ProxyFactory通过生成ProxyFactory$Adaptive对象包装了ProxyFactory的获取过程。
    • ProxyFactory的ExtensionLoader.getExtension()获取StubProxyFactoryWrapper。
    • StubProxyFactoryWrapper内部包装了JavassistProxyFactory对象。
    • JavassistProxyFactory内部包含了AbstractProxyInvoker对象。
    • AbstractProxyInvoker对象包含了Wrapper对象。
    • Wrapper对象包含了ServiceImpl对象。

    invoker 发布 exporter的过程

    • protocol.export()的protocol对象是Protocol$Adaptive对象。
    private static final Protocol protocol = 
         ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    
    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
         // 暂时关注export()方法。 
        public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0)
            throws org.apache.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException(
                   "org.apache.dubbo.rpc.Invoker argument == null");
            
            if (arg0.getUrl() == null) throw new IllegalArgumentException(
                   "org.apache.dubbo.rpc.Invoker argument getUrl() == null");
    
            org.apache.dubbo.common.URL url = arg0.getUrl();
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException(
                   "Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" 
                    + url.toString() + ") use keys([protocol])");
            
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader
                  .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            
            return extension.export(arg0);
        }   
    }
    
    • Protocol$Adaptive内部的export()核心逻辑是获取
      ExtensionLoader.getExtensionLoader().getExtension(extName)获取扩展对象。
    • 扩展对象在dubbo协议下是DubboProtocol。
    • 关注DubboProtocol的export()过程。,DubboProtocol外部封装了Filter对象
    • DubboProtocol的封装类ProtocolFilterWrapper。
    public class ProtocolFilterWrapper implements Protocol {
    
        private final Protocol protocol;
    
        public ProtocolFilterWrapper(Protocol protocol) {
            if (protocol == null) {
                throw new IllegalArgumentException("protocol == null");
            }
            this.protocol = protocol;
        }
    
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
    
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
                    };
                }
            }
            return last;
        }
    
        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            // protocol = DubboProtocol
            return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
        }
    }
    
    • ProtocolFilterWrapper负责封装过滤链。
    public class DubboProtocol extends AbstractProtocol {
    
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // 核心逻辑,用于保存invoker对象,真正的执行者
            String key = serviceKey(url);
            // 生成exporter对象并保存在exporterMap当中。
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
    
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            openServer(url);
            optimizeSerialization(url);
    
            return exporter;
        }
    }
    
    • 核心关注点在于生成invoker对象的key = serviceKey(url)。
    • invoker包装成DubboExporter对象并保存在exporterMap当中。
    • 服务调用最终会执行根据key去exporterMap当中查找DubboExporter并最终执行invoker对象。
        public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
            StringBuilder buf = new StringBuilder();
            if (StringUtils.isNotEmpty(serviceGroup)) {
                buf.append(serviceGroup);
                buf.append("/");
            }
            buf.append(serviceName);
            if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
                buf.append(":");
                buf.append(serviceVersion);
            }
            buf.append(":");
            buf.append(port);
            return buf.toString();
        }
    
    • serviceKey的生成逻辑是 serviceGroup/serviceName.serviceVersion.port,所以不同服务分组和不同版本的接口可以同时存在。
    public class DubboProtocol extends AbstractProtocol {
    
        private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client can export a service which's only for server to invoke
            boolean isServer = url.getParameter(IS_SERVER_KEY, true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    synchronized (this) {
                        server = serverMap.get(key);
                        if (server == null) {
                            serverMap.put(key, createServer(url));
                        }
                    }
                } else {
                    // server supports reset, use together with override
                    server.reset(url);
                }
            }
        }
    }
    
    
    
    public class DubboProtocol extends AbstractProtocol {
    
       private ExchangeServer createServer(URL url) {
            url = URLBuilder.from(url)
                    // send readonly event when server closes, it's enabled by default
                    .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                    // enable heartbeat by default
                    .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                    .addParameter(CODEC_KEY, DubboCodec.NAME)
                    .build();
            String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
            }
    
            ExchangeServer server;
            try {
                // 重点暂时关注下 requestHandler
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
    
            str = url.getParameter(CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
    
            return server;
        }
    }
    
    • 这部分逻辑在创建server的过程中会执行Exchangers.bind(url, requestHandler)这部分逻辑。
    • 核心关注requestHandler对象,主要关注内部如果查找invoker的getInvoker过程。
    public class DubboProtocol extends AbstractProtocol {
    
        Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
            boolean isCallBackServiceInvoke = false;
            boolean isStubServiceInvoke = false;
            int port = channel.getLocalAddress().getPort();
            String path = inv.getAttachments().get(PATH_KEY);
    
            // if it's callback service on client side
            isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
            if (isStubServiceInvoke) {
                port = channel.getRemoteAddress().getPort();
            }
    
            //callback
            isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
            if (isCallBackServiceInvoke) {
                path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
                inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
            }
            
            // 生成serviceKey并从exporterMap中根据serviceKey获取exporter对象进行执行。
            String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
            DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    
            if (exporter == null) {
                throw new RemotingException();
            }
    
            return exporter.getInvoker();
        }
    
    
        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
            @Override
            public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    
                if (!(message instanceof Invocation)) {
                    throw new RemotingException(channel, "Unsupported request: "
                            + (message == null ? null : (message.getClass().getName() + ": " + message))
                            + ", channel: consumer: " + channel.getRemoteAddress()
                            + " --> provider: " + channel.getLocalAddress());
                }
    
                Invocation inv = (Invocation) message;
                // 查找invoker过程
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                Result result = invoker.invoke(inv);
                return result.completionFuture().thenApply(Function.identity());
            }
        };
    }
    
    • 核心在于getInvoker()方法内部根据生成serviceKey并从exporterMap中根据serviceKey获取exporter对象进行执行。
    • exporterMap维护了所有的exporter,发布的时候加入exporter,执行的时候查找exporter,完成发布和执行的映射。

    相关文章

      网友评论

        本文标题:Dubbo Provider export过程分析

        本文链接:https://www.haomeiwen.com/subject/digymctx.html