Dubbo Version: 2.7.7
在Dubbo中Consumer
, Provider
的Filter都是在ProtocolFilterWrapper
类中构建的
ProtocolFilterWrapper 继承 Protocol, 核心方法是export
,refer
Protocol
@SPI("dubbo")
public interface Protocol {
...
...
/**
* Export service for remote invocation
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
...
...
}
其中export
是暴露服务供远程调用, refer
是查询远程服务并调用
ProtocolFilterWrapper
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
@Override
public int getDefaultPort() {
return protocol.getDefaultPort();
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
...
...
}
在export, refer方法中都会调用buildInvokerChain
方法构建InvokerChain,接下来重点分析refer
方法的buildInvokerChain
的构建过程
在buildInvokerChain
方法中先通过ExtensionLoader根据参数url:url
, key: reference.filter
,group: consumer
获取Filters
,然后遍历Filter构建InvokerChain
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...
...
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
...
...
throw e;
}
}
...
...
};
}
}
return last;
}
Filter invoke代码
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
...
...
return invoker.invoke(invocation);
}
我把无关代码去掉了,这样可以把重心放在InvokerChain
的构建上.遍历Filter构建Invoker,流程如下
Invoker0 -> invoke(invocation) -> filter0.invoke(invoker, invocation)
Invoker1 -> invoke(invocation) -> filter1.invoke(invoker0, invocation)
Invoker2 -> invoke(invocation) -> filter2.invoke(invoker1, invocation)
Invoker3 -> invoke(invocation) -> filter3.invoke(invoker2, invocation)
...
last = Invokern -> invoke(invocation) -> filtern.invoke(invoker(n-1), invocation)
可以看到遍历Filter
构建Invoker
构成了一个Chain
,当调用Invokern
的invoke
方法会调用filtern
的invoke
方法,然后filtern
又会调用Invoker(n-1)
的invoke
方法,沿着Invoker Chain
一直到调用Invoker
的invoke
方法
网友评论