美文网首页
Dubbo中 Filter 探究

Dubbo中 Filter 探究

作者: 小鬼当家_bf8d | 来源:发表于2019-10-21 23:31 被阅读0次

Filter 机制也称拦截器机制,在众多框架或者语言中很常见,可以实现登录鉴权,网关拦截、封装全局状态返回等,博主文章以下几个问题展开:

  1. Filter 的例子

  2. Dubbo中内置的Filter是怎样的?Consumer 和Provider 默认使用的 Filter 有哪些?

  3. Filter 何时初始化?

  4. Filter 何时会被调用?

  5. Filter 中 ListenableFilter 有何作用?

  6. Consumer 和Provider 如何使用Dubbo 内置Filter?

  7. 从Filter 看 自定义SPI 在哪些地方会被使用

以上问题看完文章后相信大家就可以清楚,若有疑问,关注博主公众号:六点A君,回复标题获取最新答案><

Filter 是Dubbo 的一个扩展点,可以理解为拦截作用,Dubbo 本身大多功能都基于该扩展点实现。

Filter 例子

先说说Dubbo 中Filter 的使用,以下摘抄自Dubbo官网:

  1. 用户自定义 filter 默认在内置 filter 之后。

  2. 特殊值 default,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy",表示 xxx 在缺省 filter 之前,yyy 在缺省 filter 之后。

  3. 特殊符号 -,表示剔除。比如:filter="-foo1",剔除添加缺省扩展点 foo1。比如:filter="-default",剔除添加所有缺省扩展点。

  4. providerservice 同时配置的 filter 时,累加所有 filter,而不是覆盖。比如:<dubbo:provider filter="xxx,yyy"/><dubbo:service filter="aaa,bbb" />,则 xxx,yyy,aaa,bbb 均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />

Dubbo 文档:http://dubbo.apache.org/zh-cn/docs/dev/impls/filter.html

Dubbo 中使用 Filter 有两种途径,可以使用Dubbo内置的Filter,或者可以使用自定义的Filter

以自定义Filter 为例,其中带入

  1. 定义类实现 org.apache.dubbo.rpc.Filter

  2. META-INF/dubbo 下增加 增加 SPI 文件 org.apache.dubbo.rpc.Filter,填写SPI 映射,例如 traceConsumer=com.anla.rpc.filter.consumer.filter.ConsumerTraceFilter

  3. 在 Consumer 端 的 dubbo:consumerdubbo:reference 进行声明,例如 <dubbo:consumer filter="traceConsumer"/>,同理可以在 Provider 端的 dubbo:providerdubbo:service 进行声明。

当然在这一步可以使用 Dubbo内置的Filter 进行使用。也可以通过 - (减号) 进行内置Filter 删除。

具体Filter 例子可以看博主写的例子:https://github.com/anLA7856/dubbolearn/tree/master/filter

Dubbo 内置的Filter

如何找Dubbo内置Filter呢?

由于Dubbo 提供SPI 机制,所以我们可以沿着这条路往下找,即找到 `org.apache.dubbo.rpc.Filter`` 在找到其有哪些子类就可以了。

在这里插入图片描述

上面图片看起来过于眼花缭乱,下面博主整理表格或许来的更加直观(起始下面更眼花缭乱,各位看官直接过了就好,需要再来细看)

| 类名 | @Adaptive | 是否默认Consumer| 是否默认Provider|排序| 作用|

|:------:| :-------------:|:-------------:|:-------------:|:-------------:|:----------:|

|ClassLoaderFilter|@Activate(group = CommonConstants.PROVIDER, order = -30000)|否|是,默认存在|-30000|用于添加invoker的ClassLoader 到本地线程中|

|DeprecatedFilter|@Activate(group = CommonConstants.CONSUMER, value = DEPRECATED_KEY)|是,需要指定 值为 deprecated|无|无|用于提醒错误如果接口配置了deprecated,则会打印错误|

|EchoFilter|@Activate(group = CommonConstants.PROVIDER, order = -110000)|否|是,默认有|-110000|用于在服务端提供基于EchoService的功能链路相应功能|

|MetricsFilter|无|否|否|无|用于一些统计接口监控统计,提供report 功能|

|TpsLimitFilter|@Activate(group = CommonConstants.PROVIDER, value = TPS_LIMIT_RATE_KEY)|否|是,需要指定,值为tpc|无|限流作用,用于限制tps,用户可自行配置|

|AccessLogFilter|@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)|否|是,需要指定,值为accesslog|无|用于打印接口log|

|ValidationFilter|@Activate(group = {CONSUMER, PROVIDER}, value = VALIDATION_KEY, order = 10000)|是,需要指定|是,需要指定|10000|在配置完validation后,在具体调用前既可以使用,用户可以基于org.apache.dubbo.validation.Validation 的 SPI方式实现新Validation |

|DubboAppContextFilter|@Activate(group = "consumer") |是,默认有|否|无|用于将当前Conusmer 的applicationName 放入attachment中|

|CacheFilter|@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)| 是,需要指定,值为cache|是,需要指定,值为cache|无|为Dubbo的核心组件,支持 service,method,consumer or provider 四种粒度缓存|

|TokenFilter|@Activate(group = CommonConstants.PROVIDER, value = TOKEN_KEY)|否|有,需要指定,value为token|无|用于鉴权访问|

|TraceFilter|@Activate(group = CommonConstants.PROVIDER)|否|有,默认存在|无|用于链路跟踪,放入相应信息|

| SentinelDubboProviderFilter | @Activate(group = "provider") |否|仅Provder,默认有|无|为Sentinel 提供的 服务端Filter|

|GenericImplFilter|@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)|是,需要指定,值为generic |否| 20000|用于支持泛化generic 调用|

|MonitorFilter|@Activate(group = {PROVIDER, CONSUMER})|是,默认有|是,默认有|无|用于监控接口调用|

|ContextFilter|@Activate(group = PROVIDER, order = -10000)|否|是,默认有|-10000|在invoker中设置服务端的RpcContext|

|ExceptionFilter|@Activate(group = CommonConstants.PROVIDER)|否|是,默认有|无|unexpect 异常将会在provider 中error层级记录|

|CompatibleFilter|无|否|否|无|为版本兼容所有,让rpc调用返回值可以兼容旧版本的invoker 所需|

|GenericFilter|@Activate(group = CommonConstants.PROVIDER, order = -20000)| 无|有,默认有|-20000|用于实现和 generic 泛化调用相关逻辑|

|FutureFilter|@Activate(group = CommonConstants.CONSUMER) |有,默认有|无|无|属于一个事件的Filter,待研究|

|TimeoutFilter|@Activate(group = CommonConstants.PROVIDER)|无|有|无|记录 超时的 invocation,但是不阻止该invoker 运行,也就是会照常运行|

|ActiveLimitFilter| @Activate(group = CONSUMER, value = ACTIVES_KEY) | 有,需要指定|无 |无|用于限制 client 端的调用量 |

|ConsumerContextFilter|@Activate(group = CONSUMER, order = -10000)|有,默认有|无|-10000|用于设定RpcContext中一些属性,例如invoker,invocation等。|

|ExecuteLimitFilter|@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)|无|有,需要指定 executes|无| 用于限制,单method 下 ,最大并行请求数限制 |

从上面分析来看,当程序没有显示指定Filter,那么Consumer 和 Provider 存在的Filter 如下:

| Provider | Consumer |

|:-------------:| :--------:|

| EchoFilter,ClassLoaderFilter, GenericFilter, ContextFilter, TraceFilter, TimeoutFilter, MonitorFilter, ExceptionFilter ,SentinelDubboProviderFilter |ConsumerContextFilter, FutureFilter,MonitorFilter, DubboAppContextFilter |

以上得出 ConsumerProvider 可以由 上面表格推到出,一方面也可以看到 @Activate 注解用途:

  1. 如果在 @Adaptive 有指定 value,则需要配置才能被加载。

  2. 如果没有 @Adaptive,则无法被加载

  3. Filter 中 排序按照 order 来的,倒序排。

  4. 最后有删除的 SentinelDubboProviderFilter 实际上不属于Dubbo 的包,它的@Adaptive 注解本应被加载,但是如果没有引入相应starter 文件,不回被加载。。只有当被用到时,才会被加载,才会 sentinel 的 SPI 文件才会被加载,才会被加入到默认的SPI 中。

  5. 所以在编写代码是,如果使用了 @Adaptive注解,满足条件后,即使不配置filter,也会生效,就是这个道理。

Filter 何时初始化

Filter 初始化是伴随着 ProtocolFilterWrapper 使用而初始化。而通过前面文章可知,ProtocolFilterWrapper 作为包装类,当有Protocol 初始化,它就会初始化,并且由链路调用一层一层下去。


    @Override

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {

            return protocol.export(invoker);

        }

        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));

    }

    @Override

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {

        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {

            return protocol.refer(type, url);

        }

        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);

    }

当 Provider 暴露服务会调用 export 方法,而 Consumer 则会调用refer 方法,上述方法都做了以下同样的事:

  1. 首先初始化注册中心协议

  2. 调用 buildInvokerChain 构造Filter 链。

下面看看 buildInvokerChain


  private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {

        Invoker<T> last = invoker;

        // 获取 group 和key下所有的 Filter 的 SPI 类文件

        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

// 构建Filter 链

        if (!filters.isEmpty()) {

            for (int i = filters.size() - 1; i >= 0; i--) {

                final Filter filter = filters.get(i);

                final Invoker<T> next = last;

                last = new Invoker<T>() {

                    @Override

                    public Class<T> getInterface() {

                        return invoker.getInterface();

                    }

                    @Override

                    public URL getUrl() {

                        return invoker.getUrl();

                    }

                    @Override

                    public boolean isAvailable() {

                        return invoker.isAvailable();

                    }

                    @Override

                    public Result invoke(Invocation invocation) throws RpcException {

                        Result asyncResult;

                        try {

                        // 直接执行filter 的invoke

                            asyncResult = filter.invoke(next, invocation);

                        } catch (Exception e) {

                            // onError callback

                            if (filter instanceof ListenableFilter) {

                                Filter.Listener listener = ((ListenableFilter) filter).listener();

                                if (listener != null) {

                                    listener.onError(e, invoker, invocation);

                                }

                            }

                            throw e;

                        }

                        return asyncResult;

                    }

                    @Override

                    public void destroy() {

                        invoker.destroy();

                    }

                    @Override

                    public String toString() {

                        return invoker.toString();

                    }

                };

            }

        }

        return new CallbackRegistrationInvoker<>(last, filters);

    }

上面方法看起来也容易理解:

  1. 通过SPI 中,加载 Filter 的扩展类。具体加载规则可以看博主前面文章以及上文表格分析

  2. 将每个Filter链起来,即包装每一个Filter。这样当有请求时候,就会依次执行每一个链起来的Filter

  3. 最后返回 CallbackRegistrationInvoker 的封装对象

Filter 中有个特殊的子类:ListenableFilter


public abstract class ListenableFilter implements Filter {

    protected Listener listener = null;

    public Listener listener() {

        return listener;

    }

}

Listener则 为 Filter 中的内部接口,依附 Filter 存在:


    interface Listener {

// 当结果返回时调用

        void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);

// 当调用方出现异常时调用

        void onError(Throwable t, Invoker<?> invoker, Invocation invocation);

    }

有部分Filter是 实现自 Listener,故其有监听作用。

buildInvokerChain 中绑定 invoke 链时,则使用了 onError,当Filter 出错时,则会记录并调用 onError 方法:


                        try {

                            asyncResult = filter.invoke(next, invocation);

                        } catch (Exception e) {

                            // onError callback

                            if (filter instanceof ListenableFilter) {

                                Filter.Listener listener = ((ListenableFilter) filter).listener();

                                if (listener != null) {

                                    listener.onError(e, invoker, invocation);

                                }

                            }

                            throw e;

                        }

而再看 buildInvokerChain 返回的 CallbackRegistrationInvoker 时,在其invoke方法中可以看到,在 Filter链 中最后一个 调用 完之后,则会进行异步封装:


        @Override

        public Result invoke(Invocation invocation) throws RpcException {

        // 异步封装

            Result asyncResult = filterInvoker.invoke(invocation);

            asyncResult.thenApplyWithContext(r -> {

                for (int i = filters.size() - 1; i >= 0; i--) {

                    Filter filter = filters.get(i);

                    // 告知 ListenableFilter 的子类

                    if (filter instanceof ListenableFilter) {

                        Filter.Listener listener = ((ListenableFilter) filter).listener();

                        if (listener != null) {

                            listener.onResponse(r, filterInvoker, invocation);

                        }

                    } else {

                        filter.onResponse(r, filterInvoker, invocation);

                    }

                }

                return r;

            });

            return asyncResult;

        }

并且对 asyncResult 异步调用完结果进行监听,并且循环filter,依次调用他们的onResponse方法。

如何 定位 SPI 的使用点

本篇以Filter 这个SPI 类型 为例,并且分析了 @Adaptive 注解 对于加载SPI 文件区别。因而总结下

SPI 使用点:

  1. 查找其基接口,看何处被 ExtensionLoader 加载

  2. 分析 ExtensionLoader 中使用 什么方法加载初 其相关联的SPI文件,方法可能为:getActivateExtensiongetActivateExtensiongetExtension ...,不同方法加载出的SPI扩展类是不一样的,都是围绕 SPI配置文件@Adaptive注解,@SPI 注解进行。

  3. 获取完具体SPI扩展类之后,就是对类的操作了,这个要看具体逻辑代码了。

觉得博主写的有用,不妨关注博主公众号: 六点A君。

哈哈哈,Dubbo小吃街不迷路:

在这里插入图片描述

相关文章

  • Dubbo中 Filter 探究

    Filter 机制也称拦截器机制,在众多框架或者语言中很常见,可以实现登录鉴权,网关拦截、封装全局状态返回等,博主...

  • Dubbo Filter 原理分析

    Dubbo Version: 2.7.7 在Dubbo中Consumer, Provider的Filter都是在P...

  • Dubbo Filter详解

    Dubbo的Filter在使用的过程中是我们扩展最频繁的内容,而且Dubbo的很多特性实现也都离不开Filter的...

  • dubbo源码系列之filter的前生

    为什么说dubbo的声明式缓存不好用!!! dubbo源码系列之filter的今世 dubbo的filter类似于...

  • Dubbo provider Filter链原理

    开篇  在dubbo的使用过程中会在标签中会配置filter变量,但是filter具...

  • 怎么玩转Dubbo Filter

    在resources下新建META-INF/dubbo/com.alibaba.dubbo.rpc.Filter文...

  • Dubbo Filter

    文档整理 官方文档 http://dubbo.apache.org/zh-cn/docs/dev/impls/f...

  • dubbo中扩展点

    1. filter 2.负载算法 3.容错算法 4. jdk中spi 和 dubbo中spi扩展

  • dubbo中的Filter链原理及应用

    filter在dubbo中的应用非常广泛,它可以对服务端、消费端的调用过程进行拦截,从而对dubbo进行功能上的扩...

  • 由dubbo的TpsLimitFilter限流,说说dubbo的

    Dubbo提供了过程拦截(即Filter)功能。dubbo的大多数功能都基于此功能实现。在dubbo的服务端,提供...

网友评论

      本文标题:Dubbo中 Filter 探究

      本文链接:https://www.haomeiwen.com/subject/xmqfvctx.html