Dubbo在服务调用时支持同步调用和异步调用等方式。
在Dubbo2.6版本及之前的版本在实现异步调用时存在一定的缺点,实际上是一种假异步。
下面列举一个异步案例。
// 此方法应该返回Foo,但异步后会立刻返回NULL
fooService.findFoo(fooId);
// 立刻得到当前调用的Future实例,当发生新的调用时这个东西将会被覆盖
Future<Foo> fooFuture = RpcContext.getContext().getFuture();
// 调用另一个服务的方法
barService.findBar(barId);
// 立刻得到当前调用的Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
// 此时,两个服务的方法在并发执行
// 等待第一个调用完成,线程会进入Sleep状态,当调用完成后被唤醒。
Foo foo = fooFuture.get();
// 同上
Bar bar = barFuture.get();
// 假如第一个调用需要等待5秒,第二个等待6秒,则整个调用过程完成的时间是6秒。
当调用服务方法后,Dubbo会创建一个DefaultFuture,并将该Future存放到RpcContext中,在用户线程中,如果用户想获取调用结果时,会从RpcContext中获取该Future,并调用get方法,但是如果此时该服务仍没有处理完毕,则会出现阻塞,直到结果返回或调用超时为止。发生阻塞时,该方法的后续步骤则得不到执行。对于异步来说,这显然是不合理的。理想中的异步是如果服务没有处理好,会继续执行用户线程的后续方法,不会阻塞等待。
从Dubbo2.7开始,Dubbo的异步调用开始以CompletableFuture为基础进行实现。
DubboInvoker是一个执行体,通过它可以发起远程调用。
在Dubbo2.6的远程调用中,部分代码如下所示(只保留了部分代码):
DubboInvoker类
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
//忽略部分代码
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//忽略部分代码
//单向调用,无返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
// 异步调用
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
// 同步调用
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
在Dubbo2.6版本及之前的版本中,不管同步调用还是异步调用,都会调用HeaderExchangeClient.request
方法,返回一个DefaultFuture
对象,不同的点是:异步调用会将该future存放到RpcContext中,并先返回一个空的RpcResult结果。而同步调用不会将该future存放到RpcContext中,而是直接调用该future的get方法,阻塞等待调用结果。
HeaderExchangeChannel类
public ResponseFuture request(Object request, int timeout) throws RemotingException {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
channel.send(req);
//忽略了部分代码
return future;
}
DefaultFuture类(忽略了部分代码)
public Object get(int timeout) throws RemotingException {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
return returnFromResponse();
}
当服务端处理完信息后,HeaderExchangeHandler
会处理发送过来的Response
,根据requestId获取对应的DefaultFuture
对象,最终调用doReceived方法对结果赋值。利用AQS的条件锁机制,唤醒阻塞线程。
DefaultFuture类
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
在Dubbo2.7版本中,对异步调用进行了改良,使用了CompletableFuture。
Dubbo2.7异步调用的一个样例:
// 此调用会立即返回null
asyncService.sayHello("world");
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture();
// 为Future添加回调
helloFuture.whenComplete((retValue, exception) -> {
if (exception == null) {
System.out.println(retValue);
} else {
exception.printStackTrace();
}
});
同样是DubboInvoker发起远程调用,在doInvoke方法中进行了改进:
DubboInvoker2.7.9版本
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//单向调用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
//同步调用和异步调用
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
}
在Dubbo2.7版本中,DubboInvolnvoker对同步调用和异步调用进行了统一处理,封装成CompletableFuture,并以 AsyncRpcResult返回。
Dubbo2.7版本下HeaderExchangeChannel.request方法与2.6版本相差不大,只是DeafultFuture对象有一点不同,即后续版本继承了 CompletableFuture类。
对于同步调用和异步调用的处理交给AsyncToSyncInvoker
类处理。
public Result invoke(Invocation invocation) throws RpcException {
// 调用DubboInvoker等Invoker返回的调用结果
Result asyncResult = invoker.invoke(invocation);
try {
// 如果是同步调用
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
// 不能使用CompletableFuture#get()方法,否则性能会出现严重下降。
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
//忽略了部分代码
return asyncResult;
}
不同与Dubbo2.6版本,Dubbo2.7在处理服务端返回结果时放弃了AQS的条件锁机制,改用CompletableFuture类的complete方法去实现。
DefaultFuture类
private void doReceived(Response res) {
//忽略部分代码
if (res.getStatus() == Response.OK) {
// 对CompletableFuture赋值结果
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
//忽略部分代码
}
对于上述的Result接口,有两个实现对象,我们在这里进行简单对比分析下。
AsyncRpcResult
此类表示未完成的RPC调用,它将保留此调用的一些上下文信息,例如RpcContext和Invocation,因此,当调用完成并且返回结果时,它可以确保与调用时相同地恢复所有上下文, 是在调用任何回调之前进行的。
当Result实现CompletionStage时,AsyncRpcResult允许您轻松构建异步过滤器链,其状态将完全由基础RPC调用的状态驱动。
AsyncRpcResult不包含任何具体值(由CompletableFuture带来的基础值除外),请将其视为状态传输节点。#getValue()
和#getException()
都是从Result接口继承的,主要实现它们出于兼容性考虑。 因为许多旧式Filter实现很可能直接调用getValue。
AppResponse
Duboo3.0.0中引入了AsyncRpcResult来替换RpcResult,并且RpcResult被替换为AppResponse:AsyncRpcResult是在调用链中实际传递的对象,
AppResponse仅代表业务结果。
AsyncRpcResult是表示未完成的RPC调用的未来,而AppResponse是此调用的实际返回类型。
从理论上讲,AppResponse不必实现Result接口,这主要是出于兼容性目的。
在Dubbo服务暴露中,ProtocolFilterWrapper会构建拦截器链Filter,在调用实际的DubboInvoker之前,会先调用一些构造的Filter,比如ExecuteLimitFilter,限制每个服务中每个方法的最大并发数。下面是Dubbo2.6构建拦截器器链的逻辑:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
ast = new Invoker<T>() {
//忽略部分代码
@Override
public Result invoke(Invocation invocation) {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
但是在Dubbo2.6版本进行异步调用中,会出现一些问题,因为Dubbo2.6在进行异步调用时,会先返回一个空的RpcResult对象,当某些Filter需要对返回的结果进行处理时,显然在该情景下无法处理结果。Dubbo2.7对这种情况进行了改进。
Dubbo2.7构建拦截器链的逻辑如下所示:
ProtocolFilterWrapper类
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (Filter filter : filters) {
last = new FilterNode<T>(invoker, last, filter);
}
}
return last;
}
然后解释下在FilterNode中的invoke方法:
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
asyncResult = filter.invoke(next, invocation);
//忽略部分代码
return asyncResult.whenCompleteWithContext((r, t) -> {
//忽略部分代码
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
当异步调用时,以AsyncRpcResult对象传递,通过CompletableFuture#whenComplete实现异步下的逻辑处理。
public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn){
// 是CompletableFuture类
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
beforeContext.accept(v, t);
fn.accept(v, t);
afterContext.accept(v, t);
});
return this;
}
Dubbo异步分析到这里就结束了,感谢大家的阅读。
网友评论