路途虽遥远,将来更美好
微信公号号:九点半的马拉
在传统模式下,Dubbo消费端需要调用某一远程服务器端的方法时,消费端也需要额外导入服务类接口API,Dubbo也由此实现了面向接口代理的高性能RPC调用。
但是当服务消费端没有服务接口或方法参数类型时,无法使用上述的方式进行服务调用,针对该场景,Dubbo使用泛化调用方法进行服务调用。
Dubbo在进行泛化调用时,将相关信息封装到Map对象中,并利用GenericService
接口处理。
举个例子
服务器端配置:
<bean id="helloserviceimpl" class="org.apache.dubbo.samples.generic.call.impl.HelloServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.samples.generic.call.api.HelloService" ref="helloserviceimpl"/>
服务器端服务具体实现类:
public class HelloServiceImpl implements HelloService {
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
future.complete("sayHelloAsync: " + name);
}).start();
return future;
}
消费端配置:
public class GenericCallConsumer {
private static GenericService genericService;
public static void main(String[] args) throws Exception {
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("generic-call-consumer");
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");
applicationConfig.setRegistry(registryConfig);
referenceConfig.setApplication(applicationConfig);
// 开启泛化
referenceConfig.setGeneric(true);
referenceConfig.setAsync(true);
referenceConfig.setTimeout(7000);
genericService = referenceConfig.get();
invokeAsyncSayHelloAsync();
public static void invokeAsyncSayHelloAsync() throws Exception {
CompletableFuture<Object> future = genericService.$invokeAsync("sayHelloAsync",
new String[]{"java.lang.String"}, new Object[]{"world"});
CountDownLatch latch = new CountDownLatch(1);
future.whenComplete((value, t) -> {
System.err.println("invokeAsyncSayHelloAsync(whenComplete): " + value);
latch.countDown();
});
latch.await();
}
原理分析
消费端通过一个代理对象进行服务调用,
1)执行InvokerInvocationHandler#invoke
方法,
2)调用MockClusterInvoker#invoke
方法
3)调用AbstractCluster$InterceptorInvokerNode#invoke
方法
这里新增了一个ClusterInterceptor
,与Filter
不同,它在一个特定的地址或invoker被选择之前的较外层执行逻辑,在服务发现之前拦截请求。
3.1)调用ConsumerContextClusterInterceptor#before
方法。
从RpcContext设置invocation,并设置localAddress和invoker(默认FailoverClusterInvoker),清除RpcContext内部的SERVER_LOCAL
上下文内容。
3.2)调用ClusterInterceptor#intercept
方法
默认调用FailoverClusterInvoker#doinvoke
方法。
在该方法中从RegistryDirectory中获取invoker列表,然后获取负载均衡LoadBalance(默认RandomLoadBalance), 选择一个invoker,进行服务调用。
4)调用InvokerWrapper#invoke
方法,之后执行一个Filter链
4.1)调用ConsumerContextFilter#invoke
方法,
设置RpcContext中的invoker变量问当前invoker(默认是ProtocolFilterWrapper),设置invocation。
从RpcContext中获取timeout-countdown变量,如果存在,则转化为TimeCountDown
对象,判断该调用是否超时,如果超时,则返回一个AsyncRpcResult对象,记录一个异常。
4.2)调用FutureFilter#invoke
方法和MonitorFilter#invoke
方法。
4.3)调用GenericImplFilter#invoke
方法,这里是泛化调用在客户端的主要核心步骤。
4.3.1)从url中获取generic
字段,调用方法不是$invoke
,也不是$invokeAysnc
时:
4.3.1.1)重新创建一个RpcInvocation,在attributes
变量中添加GENERIC_IMPL_MARKER
值,设置为true,其中:attributes
变量参数类型为Map<Object,Object>
,并且该变量只在调用者端,不会出现在线路上。
4.3.1.2)获取调用的方法名,调用的参数类型和参数值,对参数类型进行解析修改,效果如下:
java.lang.Object[][].class => "java.lang.Object[][]"
4.3.1.3)如果泛化调用方式为bean
方式,遍历参数值,并序列化为JavaBeanDescriptor
类型数据;
如果是其他调用方式,深入对象,将复杂类型转化为简单类型。
4.3.1.4)如果方法返回类型是CompletableFuture
,则设置方法名为$invokeAysnc
;其他情况设置方法名为$invoke
4.3.1.5)将参数类型设置为new Class<?>[]{String.class, String[].class, Object[].class};,这样转化为传统的泛化调用方式,并将参数值设置为类似new Object[]{methodName, types, args}的格式。
4.3.2)当调用方法为$invoke
或者$invokeAysnc
,并且方法参数变量数量为3个时,首先获取泛化参数,然后判断泛化调用方式:
4.3.2.1)如果是nativejava方式,判断参数是否为byte[]
类型;如果不是,则说明参数传递异常。
4.3.2.2)如果是bean方式,则判断参数是否为JavaBeanDescriptor
类型;如果不是,则说明参数传递异常。
4.3.3)在RpcInvocation中的attachment中设置是否泛化调用。
4.4) 调用invoker#invoke
方法
当远程调用返回结果时,会触发onResponse方法。
从url中获取generic
参数值,从invocation中获取方法名,方法参数类型,参数值GENERIC_IMPL
是否存在。
如果参数值GENERIC_IMPL
存在,并且为true:
从invoker中获取接口类型,当方法不是$invoke
也不是$invokeAysnc
,并且接口父类型为GenericService
时,从invoker中的interface参数中获取真实的interface,并转化为Class类型。
之后,是所有不同调用方式的统一处理。
获取调用的方法Method,如果调用方式是Bean方式:
判断appReponse的value是否为JavaBeanDescriptor
类型,如果是,将该value进行反序列化,重新赋值;如果不是,则抛出异常。
如果是其他调用方式,则使用PojoUtils工具类进行反序列化。
5)调用AsyncToSyncInvoker#invoke
方法
6)调用DubboInvoker#invoke
方法,发起远程调用。
服务端收到请求后,在最终调用AbstractProxyInvoker#invoke
方法之前,会先执行一个过滤器链,和上述的消费端的类似,其中会经过一个GenericFilter,该类是服务端实现泛化调用功能的重要步骤。
在GenericFilter中,首先判断方法名是否为$invoke
或者$invokeAsync
,由此来判定是否为泛化调用,如果方法名不是这两个,则直接调用下一个invoker;如果是,则执行下面的逻辑。
获取参数名称、参数类型、参数值,通过反射获取调用方法,根据不同的调用方式进行反序列化,获取实际调用方法的相关信息,然后将RpcInvocation中的相关信息进行替换:
code3.pngDubbo2.7下的超时机制
在上述中的ConsumerContextFilter#invoke
中涉及到了超时情况的处理,使用了TimeoutCountDown类,是2020.5.1日提交的信息。
Dubbo2.6版本中,在HeaderExchangeChannel中进行远程调用前,会创建一个DefaultFuture对象,里面有一个静态代码块,创建一个线程,执行RemotingInvocationTimeoutScan
任务,轮询FUTURES集合,通过DefaultFuture记录的开始时间与当前时间进行计算,判断是否超时,如果超时,则直接创建一个超时的Response,并将该DefaultFuture从FUTURES集合中移除。
当服务端在一定时间内执行完逻辑后,会发送给客户端,在此之前,客户端通过定时任务已经将相关信息从FUTURES集合中移除,所以这次服务端发送过来的信息在FUTURES集合中查找不到,所以不做处理,服务端的这次发送显得有些多余,对于客户端来说是无用的。
所以在2020.5.1日,提交了上述代码来解决这一问题。
code4.png在DubboInvoker#doInvoke
方法中进行远程远程调用前,会计算timeout。
首先从RpcContext中获取timeout-countdown
变量值
如果为空:
----》 从url中通过timeout参数获取超时时间,默认是1000,
----》从url中获取enable-timeout-countdown
参数值,默认是false,通过该参数开启新的超时机制(使用了上述的TimeoutCountDown)
----》如果开启了,在attachments
变量里添加_TO变量,值为计算后的timeout
如果不为空:
将其转化为 TimeoutCountDown对象,计算剩余的有效时间,将其设置为新的timeout,并将其添加到_TO变量,值为计算后的timeout。
那TimeoutCountDown对象在什么时候被创建的呢?
在服务端的ContextFilter。
从RpcInvocation中获取“_TO”变量的值,如果不为-1,则在RpcContext中创建一个TimeoutCountDown。
code7.png在后续的TimeoutFilter中,从RpcContext中获取TimeoutCountDown,如果超时了,则清空处理的结果。
code8.png在消费端的ConsumerContextFilter中,在进行远程调用前,同样从RpcContext中获取TimeOutCountDown,当过期时,直接返回一个异常,而不再进行远程调用。
code10.png疑问点
但是最后有一个疑问,即当超时后,服务端仍然会发送给客户端,虽然结果已经被清空,(可能自己的理解问题)。
code11.png下面的这个建议感觉挺好的,但是在Dubbo在没有发现类似的机制。
code12.png
网友评论