美文网首页
Dubbo Filter 原理分析

Dubbo Filter 原理分析

作者: 王兴岭 | 来源:发表于2020-06-10 15:45 被阅读0次

    Dubbo Version: 2.7.7

    DubboConsumer, ProviderFilter都是在ProtocolFilterWrapper类中构建的
    ProtocolFilterWrapper 继承 Protocol, 核心方法是export ,refer

    Protocol

    @SPI("dubbo")
    public interface Protocol {
    
        ...
        ...
        /**
         * Export service for remote invocation
         */
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        /**
         * Refer a remote service
         */
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        ...
        ...
    
    }
    

    其中export是暴露服务供远程调用, refer是查询远程服务并调用

    ProtocolFilterWrapper

    public class ProtocolFilterWrapper implements Protocol {
    
        private final Protocol protocol;
    
        public ProtocolFilterWrapper(Protocol protocol) {
            if (protocol == null) {
                throw new IllegalArgumentException("protocol == null");
            }
            this.protocol = protocol;
        }
    
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    
            if (!filters.isEmpty()) {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
    
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        @Override
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            Result asyncResult;
                            try {
                                asyncResult = filter.invoke(next, invocation);
                            } catch (Exception e) {
                                if (filter instanceof ListenableFilter) {
                                    ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                    try {
                                        Filter.Listener listener = listenableFilter.listener(invocation);
                                        if (listener != null) {
                                            listener.onError(e, invoker, invocation);
                                        }
                                    } finally {
                                        listenableFilter.removeListener(invocation);
                                    }
                                } else if (filter instanceof Filter.Listener) {
                                    Filter.Listener listener = (Filter.Listener) filter;
                                    listener.onError(e, invoker, invocation);
                                }
                                throw e;
                            } finally {
    
                            }
                            return asyncResult.whenCompleteWithContext((r, t) -> {
                                if (filter instanceof ListenableFilter) {
                                    ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                    Filter.Listener listener = listenableFilter.listener(invocation);
                                    try {
                                        if (listener != null) {
                                            if (t == null) {
                                                listener.onResponse(r, invoker, invocation);
                                            } else {
                                                listener.onError(t, invoker, invocation);
                                            }
                                        }
                                    } finally {
                                        listenableFilter.removeListener(invocation);
                                    }
                                } else if (filter instanceof Filter.Listener) {
                                    Filter.Listener listener = (Filter.Listener) filter;
                                    if (t == null) {
                                        listener.onResponse(r, invoker, invocation);
                                    } else {
                                        listener.onError(t, invoker, invocation);
                                    }
                                }
                            });
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
    
            return last;
        }
    
        @Override
        public int getDefaultPort() {
            return protocol.getDefaultPort();
        }
    
        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (UrlUtils.isRegistry(invoker.getUrl())) {
                return protocol.export(invoker);
            }
            return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
        }
    
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (UrlUtils.isRegistry(url)) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
        }
        ...
        ...
    
    }
    

    在export, refer方法中都会调用buildInvokerChain方法构建InvokerChain,接下来重点分析refer方法的buildInvokerChain的构建过程
    buildInvokerChain方法中先通过ExtensionLoader根据参数url:url, key: reference.filter,group: consumer获取Filters,然后遍历Filter构建InvokerChain

        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
                          ...
                          ...
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            Result asyncResult;
                            try {
                                asyncResult = filter.invoke(next, invocation);
                            } catch (Exception e) {
                                ...
                                ...
                                throw e;
                            } 
                        }
                        ...
                        ...
                    };
                }
            }
    
            return last;
        }
    

    Filter invoke代码

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        ...
        ...
        return invoker.invoke(invocation);
    }
    

    我把无关代码去掉了,这样可以把重心放在InvokerChain的构建上.遍历Filter构建Invoker,流程如下

    Invoker0 -> invoke(invocation) -> filter0.invoke(invoker, invocation)
    Invoker1 -> invoke(invocation) -> filter1.invoke(invoker0, invocation)
    Invoker2 -> invoke(invocation) -> filter2.invoke(invoker1, invocation)
    Invoker3 -> invoke(invocation) -> filter3.invoke(invoker2, invocation)
    ...
    last = Invokern -> invoke(invocation) -> filtern.invoke(invoker(n-1), invocation)
    

    可以看到遍历Filter构建Invoker构成了一个Chain,当调用Invokerninvoke方法会调用filterninvoke方法,然后filtern又会调用Invoker(n-1)invoke方法,沿着Invoker Chain一直到调用Invokerinvoke方法

    相关文章

      网友评论

          本文标题:Dubbo Filter 原理分析

          本文链接:https://www.haomeiwen.com/subject/wrcptktx.html