下面我们来说一下Dubbo的服务调用。我们之前说过,通过ProxyFactory获取到Invoker对象,然后执行方法调用,那么具体是怎么执行服务的调用呢,下面我们就来分析一下。
先看一下AbstractInvoker的invoke方法
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 设置 `invoker` 属性
invocation.setInvoker(this);
// 添加公用的隐式传参,例如,`path` `interface` 等等,详见 RpcInvocation 类。
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
// 添加自定义的隐式参数
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
// 设置 `async=true` ,若为异步方法
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 执行调用
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) {
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
再来看一下DubboInvoker的doInvoke方法
protected Result doInvoke(final Invocation invocation) {
RpcInvocation inv = (RpcInvocation) invocation;
// 获得方法名
final String methodName = RpcUtils.getMethodName(invocation);
// 获得 `path`( 服务名 ),`version`
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 获得 ExchangeClient 对象
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
// 远程调用
try {
// 获得是否异步调用
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 获得是否单向调用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获得超时时间
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 单向调用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
// 异步调用
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
// 同步调用
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
然后就是调用ExchangeClient去发送请求数据了。
服务端接受到请求,通过NettyHandler处理,调用ExchangeChannel处理,根据channel和Invocation 获取之前暴露的DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);这个key是接口全限定名:端口号,然后exporter的getInvoker方法返回具体的方法代理Invoker。给上下文RpcContext设置远程地址,开始invoke调用,然后调用方法,获取返回结果。往HeaderExchangeHandler的channel通道里写入回应 channel.send(response);在NettyChannel父类AbstractChannel父类AbstractPeer设置发送标志sent也就是超时,ChannelFuture future = channel.write(message);消费端接收到服务端的返回结果后,在HeaderExchangeHandler的received方法中处理结果handleResponse方法,调用DefaultFuture的received方法接收结果。然后调用returnFromResponse方法返回结果。
Dubbo的服务调用就分析到这里了。
网友评论