消费端调用的过程
调用链
关键方法
InvokerInvocationHandler#invoke
根据服务消费知道service注入的对象InvokerInvocationHandler(MockClusterWrapper(FailoverClusterInvoker(RegistryDirectory))),消费端调用queryPayList实际上调用的是动态生成的queryPayList方法,在生成的queryPayList方法来调用InvokerInvocationHandler的invoke方法。
--动态生成的字节码
public java.lang.String queryPayList() {
Object[] args = new Object[0];
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
MockClusterInvoker#invoke
Mock用来服务降级的,若出现非业务异常则调用mock方法。
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {//没有mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {//业务异常
throw e;
}
result = doMockInvoke(invocation, e);//mock方法
}
}
return result;
}
AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
// binding attachments(附件) into invocation,通过附件可以追加一些功能如参数的隐式传递
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
//做路由
List<Invoker<T>> invokers = list(invocation);
//初始化负载均衡
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
AbstractClusterInvoker#initLoadBalance
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
//通过url的loadbalance来获取,默认是random
if (CollectionUtils.isNotEmpty(invokers)) {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
} else {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
}
}
FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
String methodName = RpcUtils.getMethodName(invocation);
//retries的重试次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {//开始重试
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
copyInvokers = list(invocation);
}
//排除已经选择的或不可用的
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
路由
RouterChain#route
路由主要是为了筛选出符合规则的服务提供方,主要有条件路由 ConditionRouter、脚本路由 ScriptRouter。可以在读写分离,应用隔离等。
public List<Invoker<T>> route(URL url, Invocation invocation) {
--在RegistryDirectory保存的invokers,表示服务端的服务地址
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {//路由规则
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
负载均衡
通过服务集合中选择合适的服务就是负载均衡。
AbstractLoadBalance#select
通过抽象工厂方法来实现负载均衡,默认的RandomLoadBalance
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
RandomLoadBalance#doSelect
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;
// the weight of every invokers
int[] weights = new int[length];
// the first invoker's weight
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// The sum of weights
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// save for later use
weights[i] = weight;
// Sum
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
可调用的Invoker初始化过程
RegistryDirectory
RegistryDirectory的成员属性private volatile Map<String, Invoker<T>> urlInvokerMap
RegistryDirectory#toInvokers
完成对urlInvokerMap的赋值
for (URL providerUrl : urls) {//dubbo://
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
newUrlInvokerMap.put(key, invoker);
}
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
所以invoker是InvokerDelegate(ListenerInvokerWrapper(AsyncToSyncInvoker(DubboInvoker)))
DubboInvoker
AbstractInvoker#invoke
把contextAttachments放入RpcInvocation中
DubboInvoker#doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
//interfaceName
inv.setAttachment(PATH_KEY, getUrl().getPath());
//版本号
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);//超时时间
if (isOneway) {//判断是否有返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
responseFuture.whenComplete((obj, t) -> {
if (t != null) {
asyncRpcResult.completeExceptionally(t);
} else {
asyncRpcResult.complete((AppResponse) obj);
}
});
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
currentClient.request
currentClient是ReferenceCountExchangeClient(HeaderExchangeClient())对象,调用链路是ReferenceCountExchangeClient-->HeaderExchangeClient-->HeaderExchangeChannel,最后调用send方法
public void send(Object message, boolean sent) throws RemotingException {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
//NettyClient
channel.send(request, sent);
}
NettyClient的channel属性是new DecodeHandler(new HeaderExchangeHandler(ExchangeHandlerAdapter))
channel.send调用链路是AbstractClient.send->NettyChannel.send
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
网友评论