接口调用
在示例代码中
HelloService demoService = bootstrap.getCache().get(reference);
String message = demoService.sayHello("dubbo");
这里第一步就是从缓存中获取创建的代理对象,就是上一篇Dubbo系列--消费者服务引用《五》提及的代理对象
然后调用代理对象的接口方法
InvocationHandler
回顾下代理对象的接口方法实现
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
// $w是javassist中的类型转换,基础类型转为包装类型,其它类型会忽略这个
args[0] = ($w)$1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String)ret;
}
可以看到它调用的是InvocationHandler
的invoke
方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...
return InvocationUtil.invoke(invoker, rpcInvocation);
}
public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
...
return invoker.invoke(rpcInvocation).recreate();
...
}
而这个invoke
方法实际调用的是上一篇Dubbo系列--消费者服务引用《五》中创建的Invoke
对象的invoke
方法
在回顾下我们创建的代理对象
![](https://img.haomeiwen.com/i12861705/f06a762174d5b010.png)
可以看到它引用的是MigrationInvoker
对象,而MigrationInvoker
对象又同时持有两个invoker
-
invoker: 对应接口级别的Invoker对象
-
serviceDiscoveryInvoker:对应应用级别的Invoker对象
上一篇中我们也提及了默认的currentAvailableInvoker
指向的是应用级别的Invoker对象
Invoker
我们进一步分析下MigrationInvoker
的invoker
方法
- MigrationInvoker:同时持有两个
invoker
,分别对应接口级别的Invoker对象和应用级别的Invoker对象
@Override
public Result invoke(Invocation invocation) throws RpcException {
if (currentAvailableInvoker != null) {
...
return decideInvoker().invoke(invocation);
...
}
}
这里需要先决定使用哪一个Invoker对象,因为默认的currentAvailableInvoker
指向的是应用级别的Invoker对象,所以只要应用级别的Invoker对象可用,就会优先使用它(只要Netty客户端连接还存活就是可用)
- ScopeClusterInvoker:增加了
Injvm
协议的判断,远程调用则什么都没干
@Override
public Result invoke(Invocation invocation) throws RpcException {
...
return invoker.invoke(invocation);
}
- MockClusterInvoker:判断是否是测试接口,非测试接口调用什么都不干
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result;
String value = getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (ConfigUtils.isEmpty(value)) {
//no mock
result = this.invoker.invoke(invocation);
}
- ClusterFilterInvoker:由于之前创建ClusterInvoker的时候
(ClusterInvoker<T>) cluster.join(directory, true);
第二个参数指定为了true, 因此会创建Invoker的过滤器链,所以这里会走到ClusterFilterInvoker
,它就是开始Invoker的过滤器链
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filterInvoker.invoke(invocation);
}
- FilterChainBuilder$CallbackRegistrationInvoker:异步接收返回结果
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(nextNode, invocation);
}
- FilterChainBuilder$CopyOfFilterChainNode:是一个接口内部类,它把所有的Invoker过滤器都串起来
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(nextNode, invocation);
}
它会依次经过
- ConsumerContextFilter:设置RpcContext
- ConsumerClassLoaderFilter:设置当前线程的类加载器
- ContextHolderParametersSelectedTransferFilter:
- ObservationSenderFilter:
- FutureFilter:
- MetricsClusterFilter:
- MonitorFilter:
- RouterSnapshotFilter:
这些过滤器都是针对特性定制的,一般情况下是啥也没干,经过这些过滤器后,才会到达FailoverClusterInvoker
,由于它没有重写invoke
方法,因此会直接使用父类AbstractClusterInvoker
中的
@Override
public Result invoke(final Invocation invocation) throws RpcException {
...
List<Invoker<T>> invokers = list(invocation);
...
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
...
return doInvoke(invocation, invokers, loadbalance);
...
}
这里主要包括三步
- 首先会从
RegistryDirectory
中获取可用的Invoker,对应上一篇文章里记录的
image.png
@Override
public List<Invoker<T>> doList(SingleRouterChain<T> singleRouterChain,
BitList<Invoker<T>> invokers, Invocation invocation) {
...
List<Invoker<T>> result = singleRouterChain.route(getConsumerUrl(), invokers, invocation);
return result == null ? BitList.emptyList() : result;
...
}
可用看到RegistryDirectory
是从RouterChain
中获取的Invoker,这里的RouterChain
对应的也是上一篇文章里保存在RegistryDirectory
中的
![](https://img.haomeiwen.com/i12861705/063c5681a291f654.png)
默认使用的是当前的RouterChain
,对应的是mainChain
public List<Invoker<T>> simpleRoute(URL url, BitList<Invoker<T>> availableInvokers, Invocation invocation) {
BitList<Invoker<T>> resultInvokers = availableInvokers.clone();
// 1. route state router
resultInvokers = headStateRouter.route(resultInvokers, url, invocation, false, null);
...
if (routers.isEmpty()) {
return resultInvokers;
}
...
}
最终会进入路由链里找Invoker
首先进入的是StateRouter
链,这个链通过抽象类AbstractStateRouter
中的方法串起来
@Override
public final BitList<Invoker<T>> route(BitList<Invoker<T>> invokers, URL url, Invocation invocation, boolean needToPrintMessage, Holder<RouterSnapshotNode<T>> nodeHolder) throws RpcException {
...
BitList<Invoker<T>> routeResult;
routeResult = doRoute(invokers, url, invocation, needToPrintMessage, nodeHolder, messageHolder);
if (routeResult != invokers) {
routeResult = invokers.and(routeResult);
}
// check if router support call continue route by itself
if (!supportContinueRoute()) {
// use current node's result as next node's parameter
if (!shouldFailFast || !routeResult.isEmpty()) {
routeResult = continueRoute(routeResult, url, invocation, needToPrintMessage, nodeHolder);
}
}
...
return routeResult;
}
它会调用子类的doRoute
方法寻找Invoker,如果子类返回的Invoker和当前的Invoker不同,则把子类的Invoker添加到当前的Invoker中
这里当前的Invoker对应的就是RegistryDirectory
中记录的Invoker
然后通过continueRoute
继续寻找下一个子类,它会依次经过
-
MockInvokersSelector
-
MeshRuleRouter
-
TagStateRouter
-
XdsRouter
-
ListenableStateRouter
-
AppScriptStateRouter
-
TailStateRouter
经过所有StateRouter
的子类链处理后,得到所有的Invoker,一般情况下得到的Invoker仍然是只有RegistryDirectory
中记录的Invoker
然后会进入Router
链,由于默认情况下Router
链为空,所以会直接返回
最终返回的仍然是RegistryDirectory
中记录的Invoker,但是经过了StateRouter
链和Router
链的处理
- 第二步是初始化负载均衡
也是通过SPI方式加载LoadBalance
的实现类,接口定义上指定了默认的实现类是random对应的RandomLoadBalance
@SPI("random")
public interface LoadBalance {
}
- 第三步是进入
FailoverClusterInvoker
的doInvoker
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 重试次数,默认是3次
int len = calculateInvokeTimes(methodName);
...
for (int i = 0; i < len; i++) {
...
// 从众多Invoker中挑选一个,通过负载均衡策略来挑选
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
Result result = invokeWithContext(invoker, invocation);
...
success = true;
return result;
}...
这里通过重试的方式来执行Invoker,默认情况下重试3次
然后通过负载均衡策略挑选一个Invoker执行,由于我们这里只有一个Invoker,所有会直接返回,还没到负载均衡去挑选的那一步
最后就是通过这个Invoker去执行了,按照上一篇文章中,我们的Invoker是通过Wrapper包装的DubboInvoker
![](https://img.haomeiwen.com/i12861705/447f46ea4651868d.png)
- ReferenceCountInvokerWrapper:这个Invoker没干啥
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
...
return invoker.invoke(invocation);
...
}
然后又要经过过滤器链
-
MetricsFilter
-
ListenerInvokerWrapper
最终达到DubboInvoker
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
...
List<? extends ExchangeClient> exchangeClients = clientsProvider.getClients();
if (exchangeClients.size() == 1) {
currentClient = exchangeClients.get(0);
} else {
currentClient = exchangeClients.get(index.getAndIncrement() % exchangeClients.size());
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
Request request = new Request();
if (payload != null) {
request.setPayload(payload);
}
request.setData(inv);
request.setVersion(Version.getProtocolVersion());
if (isOneway) {
...
} else {
request.setTwoWay(true);
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
...
return result;
}
}...
}
可用看到这里就是取出之前创建的Netty
客户端,然后判断是否oneWay(即不需要返回结果的)
然后通过Netty客户端把请求发送出去,请求包装了Invocation对象
拿到返回结果后在进行解析
至此才完成了一次远程调用
网友评论