Dubbo Filter详解

作者: 此鱼不得水 | 来源:发表于2018-01-08 19:06 被阅读4422次

    Dubbo的Filter在使用的过程中是我们扩展最频繁的内容,而且Dubbo的很多特性实现也都离不开Filter的工作,今天一起来看一下Filter的具体实现。

    Filter(过滤器)在很多框架中都有使用过这个概念,基本上的作用都是类似的,在请求处理前或者处理后做一些通用的逻辑,而且Filter可以有多个,支持层层嵌套。
    Dubbo的Filter概念基本上符合我们正常的预期理解,而且Dubbo官方针对Filter做了很多的原生支持,目前大致有20来个吧,包括我们熟知的RpcContext,accesslog功能都是通过filter来实现了,下面一起详细看一下Filter的实现。
    Dubbo的Filter实现入口是在ProtocolFilterWrapper,因为ProtocolFilterWrapper是Protocol的包装类,所以会在加载的Extension的时候被自动包装进来(理解这里的前提是理解Dubbo的SPI机制),然后我们看一下这个Filter链是如何构造的。

        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            //向注册中心引用服务的时候并不会进行filter调用链
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }
        
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            //获得所有激活的Filter(已经排好序的)
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (filters.size() > 0) {
                for (int i = filters.size() - 1; i >= 0; i --) {
                    final Filter filter = filters.get(i);
                    //复制引用,构建filter调用链
                    final Invoker<T> next = last;
                    //这里只是构造一个最简化的Invoker作为调用链的载体Invoker
                    last = new Invoker<T>() {
    
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
    
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
        
    

    看到上面的内容,我们大致能明白实现是这样子的,通过获取所有可以被激活的Filter链,然后根据一定顺序构造出一个Filter的调用链,最后的调用链大致是这样子:Filter1->Filter2->Filter3->......->Invoker,这个构造Filter链的逻辑非常简单,重点就在于如何获取被激活的Filter链。

        //将key在url中对应的配置值切换成字符串信息数组
        public List<T> getActivateExtension(URL url, String key, String group) {
            String value = url.getParameter(key);
            return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
        }
        
        public List<T> getActivateExtension(URL url, String[] values, String group) {
            List<T> exts = new ArrayList<T>();
            //所有用户自己配置的filter信息(有些Filter是默认激活的,有些是配置激活的,这里这里的names就指的配置激活的filter信息)
            List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
    
            //如果这些名称里不包括去除default的标志(-default),换言之就是加载Dubbo提供的默认Filter
            if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
                //加载extension信息
                getExtensionClasses();
                for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                    //name指的是SPI读取的配置文件的key
                    String name = entry.getKey();
                    Activate activate = entry.getValue();
                    //group主要是区分实在provider端生效还是consumer端生效
                    if (isMatchGroup(group, activate.group())) {
                        T ext = getExtension(name);
                        //这里以Filter为例:三个判断条件的含义依次是:
                        //1.用户配置的filter列表中不包含当前ext
                        //2.用户配置的filter列表中不包含当前ext的加-的key
                        //3.如果用户的配置信息(url中体现)中有可以激活的配置key并且数据不为0,false,null,N/A,也就是说有正常的使用
                        if (! names.contains(name)
                                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                                && isActive(activate, url)) {
                            exts.add(ext);
                        }
                    }
                }
                //根据Activate注解上的order排序
                Collections.sort(exts, ActivateComparator.COMPARATOR);
            }
            //进行到此步骤的时候Dubbo提供的原生的Filter已经被添加完毕了,下面处理用户自己扩展的Filter
            List<T> usrs = new ArrayList<T>();
            for (int i = 0; i < names.size(); i ++) {
                String name = names.get(i);
                //如果单个name不是以-开头并且所有的key里面并不包含-'name'(也就是说如果配置成了"dubbo,-dubbo"这种的可以,这个if是进不去的)
                if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                        && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                    //可以通过default关键字替换Dubbo原生的Filter链,主要用来控制调用链顺序
                    if (Constants.DEFAULT_KEY.equals(name)) {
                        if (usrs.size() > 0) {
                            exts.addAll(0, usrs);
                            usrs.clear();
                        }
                    } else {
                        //加入用户自己定义的扩展Filter
                        T ext = getExtension(name);
                        usrs.add(ext);
                    }
                }
            }
            if (usrs.size() > 0) {
                exts.addAll(usrs);
            }
            return exts;
        }
        
    

    基本上到这里就能看到Filter链是如何被加载进来的,这里设计的非常灵活,忍不住要感叹一下:通过简单的配置‘-’可以手动剔除Dubbo原生的一定加载Filter,通过default来代替Dubbo原生的一定会加载的Filter从而来控制顺序。这些设计虽然都是很小的功能点,但是总体的感觉是十分灵活,考虑的比较周到,非常值得我这种菜鸟学习。

    知道了Filter构造的过程之后,我们就详细看几个比较重要的Filter信息。
    Filter在作用端区分的话主要是区分为consumer和provider,下面我们就以这个为区分来分别介绍一些重点的Filter。

    Cunsumer

    ConsumerContextFilter (默认触发)

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            //在当前的RpcContext中记录本地调用的一次状态信息
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0)
                    .setRemoteAddress(invoker.getUrl().getHost(), 
                                      invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation)invocation).setInvoker(invoker);
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                RpcContext.getContext().clearAttachments();
            }
        }
    

    其实简单来看这个Filter的话是十分简单,它又是怎么将客户端设置的隐式参数传递给服务端呢?载体就是Invocation对象,在客户端调用Invoker.invoke方法时候,会去取当前状态记录器RpcContext中的attachments属性,然后设置到RpcInvocation对象中,在RpcInvocation传递到provider的时候会通过另外一个过滤器ContextFilter将RpcInvocation对象重新设置回RpcContext中供服务端逻辑重新获取隐式参数。这就是为什么RpcContext只能记录一次请求的状态信息,因为在第二次调用的时候参数已经被新的RpcInvocation覆盖掉,第一次的请求信息对于第二次执行是不可见的。

    ActiveLimitFilter (当配置了actives并且值不为0的时候触发)

    ActiveLimitFilte主要用于限制同一个客户端对于一个服务端方法的并发调用量。(客户端限流)

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
            //主要记录每台机器针对某个方法的并发数量
            RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            if (max > 0) {
                long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
                long start = System.currentTimeMillis();
                long remain = timeout;
                int active = count.getActive();
                if (active >= max) {
                    synchronized (count) {
                        //这个while循环是必要的,因为在一次wait结束后,可能线程调用已经结束了,腾出来consumer的空间
                        while ((active = count.getActive()) >= max) {
                            try {
                                count.wait(remain);
                            } catch (InterruptedException e) {
                            }
                            //如果wait方法被中断的话,remain这时候有可能大于0
                            //如果其中一个线程运行结束自后调用notify方法的话,也有可能remain大于0
                            long elapsed = System.currentTimeMillis() - start;
                            remain = timeout - elapsed;
                            if (remain <= 0) {
                                throw new RpcException("...");
                            }
                        }
                    }
                }
            }
            try {
                //调用开始和结束后增减并发数量
                long begin = System.currentTimeMillis();
                RpcStatus.beginCount(url, methodName);
                try {
                    Result result = invoker.invoke(invocation);
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                    return result;
                } catch (RuntimeException t) {
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                    throw t;
                }
            } finally {
                //这里很关键,因为一个调用完成后要通知正在等待执行的队列
                if(max>0){
                    synchronized (count) {
                        count.notify();
                    } 
                }
            }
        }
    

    FutureFilter

    Future主要是处理事件信息,主要有以下几个事件:

    • oninvoke 在方法调用前触发(如果调用出现异常则会直接触发onthrow方法)
    • onreturn 在方法返回会触发(如果调用出现异常则会直接触发onthrow方法)
    • onthrow 调用出现异常时候触发
        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
            final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
            // 这里主要处理回调逻辑,主要区分三个时间:oninvoke:调用前触发,onreturn:调用后触发 onthrow:出现异常情况时候触发
            fireInvokeCallback(invoker, invocation);
            //需要在调用前配置好是否有返回值,已供invoker判断是否需要返回future.
            Result result = invoker.invoke(invocation);
            if (isAsync) {
                asyncCallback(invoker, invocation);
            } else {
                syncCallback(invoker, invocation, result);
            }
            return result;
        }
        
        private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
            final Method onInvokeMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
            final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
            
            if (onInvokeMethod == null  &&  onInvokeInst == null ){
                return ;
            }
            if (onInvokeMethod == null  ||  onInvokeInst == null ){
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onreturn callback config , but no such "+(onInvokeMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
            }
            //由于JDK的安全检查耗时较多.所以通过setAccessible(true)的方式关闭安全检查就可以达到提升反射速度的目的
            if (onInvokeMethod != null && ! onInvokeMethod.isAccessible()) {
                onInvokeMethod.setAccessible(true);
            }
            //从之类可以看出oninvoke的方法参数要与调用的方法参数一致
            Object[] params = invocation.getArguments();
            try {
                onInvokeMethod.invoke(onInvokeInst, params);
            } catch (InvocationTargetException e) {
                fireThrowCallback(invoker, invocation, e.getTargetException());
            } catch (Throwable e) {
                fireThrowCallback(invoker, invocation, e);
            }
        }
        
        //fireReturnCallback的逻辑与fireThrowCallback基本一样,所以不用看了
        private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
            final Method onthrowMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
            final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
    
            if (onthrowMethod == null  &&  onthrowInst == null ){
                return ;
            }
            if (onthrowMethod == null  ||  onthrowInst == null ){
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onthrow callback config , but no such "+(onthrowMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
            }
            if (onthrowMethod != null && ! onthrowMethod.isAccessible()) {
                onthrowMethod.setAccessible(true);
            }
            Class<?>[] rParaTypes = onthrowMethod.getParameterTypes() ;
            if (rParaTypes[0].isAssignableFrom(exception.getClass())){
                try {
                    //因为onthrow方法的参数第一个值必须为异常信息,所以这里需要构造参数列表
                    Object[] args = invocation.getArguments();
                    Object[] params;
                    
                    if (rParaTypes.length >1 ) {
                        //原调用方法只有一个参数而且这个参数是数组(单独拎出来计算的好处是这样可以少复制一个数组)
                        if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)){
                            params = new Object[2];
                            params[0] = exception;
                            params[1] = args ;
                        }else {//原调用方法有多于一个参数
                            params = new Object[args.length + 1];
                            params[0] = exception;
                            System.arraycopy(args, 0, params, 1, args.length);
                        }
                    } else {//原调用方法没有参数
                        params = new Object[] { exception };
                    }
                    onthrowMethod.invoke(onthrowInst,params);
                } catch (Throwable e) {
                    logger.error(invocation.getMethodName() +".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), e);
                } 
            } else {
                logger.error(invocation.getMethodName() +".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), exception);
            }
        }
        
    

    同步异步的主要处理区别就是同步调用的话,事件触发是直接调用的,没有任何逻辑;异步的话就是首先获取到调用产生的Future对象,然后复写Future的done()方法,将fireThrowCallback和fireReturnCallback逻辑引入即可。

    Provider

    ContextFilter

    ContextFilter和ConsumerContextFilter是结合使用的,之前的介绍中已经看了ConsumerContextFilter,下面再简单看一下ContextFilter,来验证刚才讲到的逻辑。

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            Map<String, String> attachments = invocation.getAttachments();
            if (attachments != null) {
            //隐式参数重剔除一些核心消息
                attachments = new HashMap<String, String>(attachments);
                attachments.remove(Constants.PATH_KEY);
                attachments.remove(Constants.GROUP_KEY);
                attachments.remove(Constants.VERSION_KEY);
                attachments.remove(Constants.DUBBO_VERSION_KEY);
                attachments.remove(Constants.TOKEN_KEY);
                attachments.remove(Constants.TIMEOUT_KEY);
            }
            //这里又重新将invocation和attachments信息设置到RpcContext,这里设置以后provider的代码就可以获取到consumer端传递的一些隐式参数了
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setAttachments(attachments)
                    .setLocalAddress(invoker.getUrl().getHost(), 
                                     invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation)invocation).setInvoker(invoker);
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                RpcContext.removeContext();
            }
        }
    

    EchoFilter

    回响测试主要用来检测服务是否正常(网络状态),单纯的检测网络情况的话其实不需要执行真正的业务逻辑的,所以通过Filter验证一下即可.

        public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
            if(inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1 )
                return new RpcResult(inv.getArguments()[0]);
            return invoker.invoke(inv);
        }
    

    ExecuteLimitFilter

    服务端接口限制限流的具体执行逻辑就是在ExecuteLimitFilter中,因为服务端不需要考虑重试等待逻辑,一旦当前执行的线程数量大于指定数量,就直接返回失败了,所以实现逻辑相对于ActiveLimitFilter倒是简便了不少。

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
            if (max > 0) {
                RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
                if (count.getActive() >= max) {
                    throw new RpcException("...");
                }
            }
            long begin = System.currentTimeMillis();
            boolean isException = false;
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (Throwable t) {
                isException = true;
                if(t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                }
                else {
                    throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
                }
            }
            finally {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);
            }
        }
    

    ExceptionFilter

    Dubbo 对于异常的处理有自己的一套规则:

    • 如果是checked异常则直接抛出
    • 如果是unchecked异常但是在接口上有声明,也会直接抛出
    • 如果异常类和接口类在同一jar包里,直接抛出
    • 如果是JDK自带的异常,直接抛出
    • 如果是Dubbo的异常,直接抛出
    • 其余的都包装成RuntimeException然后抛出(避免异常在Client出不能反序列化问题)
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            try {
                Result result = invoker.invoke(invocation);
                if (result.hasException() && GenericService.class != invoker.getInterface()) {
                    try {
                        Throwable exception = result.getException();
    
                        // 如果是checked异常,直接抛出
                        if (! (exception instanceof RuntimeException) && (exception instanceof Exception)) {
                            return result;
                        }
                        // 运行时异常,并且在方法签名上有声明,直接抛出
                        try {
                            Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                            Class<?>[] exceptionClassses = method.getExceptionTypes();
                            for (Class<?> exceptionClass : exceptionClassses) {
                                if (exception.getClass().equals(exceptionClass)) {
                                    return result;
                                }
                            }
                        } catch (NoSuchMethodException e) {
                            return result;
                        }
    
                        // 未在方法签名上定义的异常,在服务器端打印ERROR日志
                        logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                                + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
    
                        // 异常类和接口类在同一jar包里,直接抛出
                        String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                        String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                        if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)){
                            return result;
                        }
                        // 是JDK自带的异常,直接抛出
                        String className = exception.getClass().getName();
                        if (className.startsWith("java.") || className.startsWith("javax.")) {
                            return result;
                        }
                        // 是Dubbo本身的异常,直接抛出
                        if (exception instanceof RpcException) {
                            return result;
                        }
    
                        // 否则,包装成RuntimeException抛给客户端
                        return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
                    } catch (Throwable e) {
                        logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                                + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                        return result;
                    }
                }
                return result;
            } catch (RuntimeException e) {
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                throw e;
            }
        }
    

    基本到这里的话把比较重要的Filter内容都有讲解到了,我们可以根据自己的需求非常轻易地扩展适合自己业务使用的Filter。

    本文最后,还是习惯性的撒花~~~~

    相关文章

      网友评论

      • lucode:你好,现在有一个场景,由于引用了别人的包,强制开启了一个过滤器,现在我想在我的程序里面外部强制禁止掉这个过滤器,请问有什么方案
        此鱼不得水:你可以在你们工程的配置里面增加“-”来排除目标的filter。
        如果对方的filter名字叫做AFilter的话,你直接配置“-AFilter”即可
      • 可乐高:buildInvokerChain的返回形如invoke(f1,invoke(f2,invoke(f3,invoker)))
      • 景_b92f:通过RpcContext 的setAttachments在消费端获取不到是什么原因
        此鱼不得水:在每次dubbo调用的时候都会刷新RpcContext,你看下你是不是中间经历了其他的调用
      • outwar:楼主能问几个问题么?1.filter一定要添加@Activate注解?2.自定义的filter执行顺序在dubbo的filter之后?
        此鱼不得水:1.filter不需要添加@Activate注解,只不过filter的实现中用到了Activate。
        2.自定义的filter可以在dubbo的filter之前或者之后都可以的

      本文标题:Dubbo Filter详解

      本文链接:https://www.haomeiwen.com/subject/retinxtx.html