美文网首页
Dubbo Filter 原理分析

Dubbo Filter 原理分析

作者: 王兴岭 | 来源:发表于2020-06-10 15:45 被阅读0次

Dubbo Version: 2.7.7

DubboConsumer, ProviderFilter都是在ProtocolFilterWrapper类中构建的
ProtocolFilterWrapper 继承 Protocol, 核心方法是export ,refer

Protocol

@SPI("dubbo")
public interface Protocol {

    ...
    ...
    /**
     * Export service for remote invocation
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * Refer a remote service
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    ...
    ...

}

其中export是暴露服务供远程调用, refer是查询远程服务并调用

ProtocolFilterWrapper

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                try {
                                    Filter.Listener listener = listenableFilter.listener(invocation);
                                    if (listener != null) {
                                        listener.onError(e, invoker, invocation);
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                listener.onError(e, invoker, invocation);
                            }
                            throw e;
                        } finally {

                        }
                        return asyncResult.whenCompleteWithContext((r, t) -> {
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                Filter.Listener listener = listenableFilter.listener(invocation);
                                try {
                                    if (listener != null) {
                                        if (t == null) {
                                            listener.onResponse(r, invoker, invocation);
                                        } else {
                                            listener.onError(t, invoker, invocation);
                                        }
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                if (t == null) {
                                    listener.onResponse(r, invoker, invocation);
                                } else {
                                    listener.onError(t, invoker, invocation);
                                }
                            }
                        });
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }

        return last;
    }

    @Override
    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (UrlUtils.isRegistry(url)) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
    }
    ...
    ...

}

在export, refer方法中都会调用buildInvokerChain方法构建InvokerChain,接下来重点分析refer方法的buildInvokerChain的构建过程
buildInvokerChain方法中先通过ExtensionLoader根据参数url:url, key: reference.filter,group: consumer获取Filters,然后遍历Filter构建InvokerChain

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                      ...
                      ...
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            ...
                            ...
                            throw e;
                        } 
                    }
                    ...
                    ...
                };
            }
        }

        return last;
    }

Filter invoke代码

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    ...
    ...
    return invoker.invoke(invocation);
}

我把无关代码去掉了,这样可以把重心放在InvokerChain的构建上.遍历Filter构建Invoker,流程如下

Invoker0 -> invoke(invocation) -> filter0.invoke(invoker, invocation)
Invoker1 -> invoke(invocation) -> filter1.invoke(invoker0, invocation)
Invoker2 -> invoke(invocation) -> filter2.invoke(invoker1, invocation)
Invoker3 -> invoke(invocation) -> filter3.invoke(invoker2, invocation)
...
last = Invokern -> invoke(invocation) -> filtern.invoke(invoker(n-1), invocation)

可以看到遍历Filter构建Invoker构成了一个Chain,当调用Invokerninvoke方法会调用filterninvoke方法,然后filtern又会调用Invoker(n-1)invoke方法,沿着Invoker Chain一直到调用Invokerinvoke方法

相关文章

  • Dubbo Filter 原理分析

    Dubbo Version: 2.7.7 在Dubbo中Consumer, Provider的Filter都是在P...

  • Dubbo Filter 原理

    Dubbo的Filter职责链有点绕: 在Invoker#invoke方法里调用Filter#invoke 在Fi...

  • 【Dubbo】服务发布原理

    Provider启动流程 通过dubbo的启动日志分析dubbo的服务发布原理 1. 暴露本地服务 [DUBBO]...

  • 利用dubbo扩展打印log

    原理 利用dubbo 的spi扩展机制 实现 实现Filter接口此处需要关注下注解@Activate,其中的参数...

  • dubbo源码系列之filter的前生

    为什么说dubbo的声明式缓存不好用!!! dubbo源码系列之filter的今世 dubbo的filter类似于...

  • Dubbo provider Filter链原理

    开篇  在dubbo的使用过程中会在标签中会配置filter变量,但是filter具...

  • dubbo基于xml配置解析原理

    基于dubbo-2.7.8进行分析。 1. 原理 dubbo利用Spring XML schema扩展机制,自定义...

  • Dubbo源码分析(七) 过滤器

    下面我们来分析一下Dubbo的过滤器。在服务调用之前,Dubbo会通过各种filter来对请求的数据进行过滤,现在...

  • dubbo架构原理(1)

    1、准备 在分析探索Dubbo架构原理之前,我们需要准备一下环境,用于后面我们来分析dubbo的架构。 1.1 Z...

  • Dubbo原理分析

    1.dubbo是什么? Dubbo是阿里巴巴SOA服务化治理方案的核心框架,每天为2,000+个服务提供3,00...

网友评论

      本文标题:Dubbo Filter 原理分析

      本文链接:https://www.haomeiwen.com/subject/wrcptktx.html