前言
在一次完整的Dubbo RPC调用过程中,需要用到两次代理。一次是在服务的调用方,代理面向的是用户代码,用户在调用接口的时候,实际调用的是接口的代理,代理实现中将用户调用转换成一次Dubbo远程调用。另外一个代理的场景是在服务提供方,代理面向的是网络通信层。当通信层接收到用户的请求后,需要调用本地进程中的接口实现,代理的作用就是根据请求中的参数找到本地实现类,并且调用它的方法。再次回顾一下之前的RPC调用图:
在服务提供端export服务的时候,首先从
ProxyFactory
获取到一个Invoker
,然后将Invoker作为入参,调用Protocol
的export方法。这样在ExchangeHandler
收到Request之后,就能够根据url找到请求对应接口的Invoker。所以,对于网络交换层来说Invoker就是它使用的代理实现。
代理实现
首先来看下代理相关的接口定义:
@SPI("javassist")
public interface ProxyFactory {
...
...
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
这个接口在服务提供端暴露服务时被调用,第一个proxy
参数就是添加@Service注解的实现类的实例。这个接口默认采用的javassist的实现。
@SPI("dubbo")
public interface Protocol {
...
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
...
}
拿到Invoker代理后,通过Protocol的export方法使用指定的协议暴露服务,如果使用Dubbo协议暴露服务,则调用的是DubboProtocol.export(Invoker)
。
Proxy实现
还是以Dubbo官方的Demo为例子,看看proxy最终生成的Invoker的代码是什么。ProxyFactory有两个实现类,一个是JDK的实现,直接使用Java的反射。
JdkProxyFactory
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
}
JavassistProxyFactory
Dubbo默认采用的JavassistProxyFactory
的getInvoker代码实现如下:
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
生成的Invoker的doInvoke()方法最终调用的是Wrapper类的实现,通过Wrapper类来调用的本地实现类,这个类是使用javassist在运行期生成代码并编译。对于DemoServiceImpl
生成的代码如下:
public class org.apache.dubbo.common.bytecode.Wrapper1 extends org.apache.dubbo.common.bytecode.Wrapper {
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{
org.apache.dubbo.demo.provider.DemoServiceImpl w;
try{
w = (org.apache.dubbo.demo.provider.DemoServiceImpl)o;
} catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if( "sayHelloAsync".equals( n ) && p.length == 1 ) {
return w.sayHelloAsync((java.lang.String)v[0]);
}
if( "sayHello".equals( n ) && p.length == 1 ) {
return w.sayHello((java.lang.String)v[0]);
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}
}
逻辑比较简单,使用if-else判断方法名,然后直接调用方法。因为是直接调用所以比使用反射效率来的更高一点。
Protocol实现
上一步获取到Invoker之后,Protocol就可以使用指定的协议在端口上监听请求了,请求来了就找有没有接口的Invoker,有的话就调用Invoker返回结果。下面看下Dubbo Protocol的export()实现:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//接口服务的唯一标识:group+接口名+version+端口
String key = serviceKey(url);
//缓存已export的服务
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//Conusmer暴露Stub服务,貌似暂时没有地方用到
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
...
}
//开启Server,监听请求
openServer(url);
//序列化优化设置,为了支持Kryo的序列化优化
optimizeSerialization(url);
return exporter;
}
上面的逻辑除了缓存之外,最主要的逻辑都在OpenServer()里面。就是开启监听端口,等待对Service的调用。openServer()操作最终调用的createServer()方法。
private ProtocolServer createServer(URL url) {
//增加默认配置
url = URLBuilder.from(url)
// 当Server close过程中,发送readonly的event
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 默认开启心跳,Dubbo协议是长链接,需要心跳来保持连接状态
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 默认使用Dubbo协议的编解码器
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
//参数校验,配置的server类型是否支持,默认是netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
//调用Exchange层开启Server
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//参数校验,要求的client端的协议实现,这里放在后面的原因不太理解?
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
开启Server的时候最主要是限定Codec参数为DubboCodec,因为这个是DubboProtocol。然后调用Exchanger的bind方法,同时会传入请求处理的Handler。关于暴露服务时Exchange层和Transporter层的实现,下一篇再详细说。
总结
准确的说,服务提供端的代理称之为反射层可能更贴切一点,实际上就是从网络层收到对端要请求的接口名和方法名,通过这个模块转换成对本地类的方法调用。Dubbo把它称之为代理,可能是为了更好的将Consumer和Provider的分层模型做统一。通过代理层,Dubbo将协议的支持和对类代码的调用做了隔离,将对接口的本地实现的调用抽象成Invoker,不同协议不需要关心Invoker的实现。
网友评论