美文网首页
Dubbo源码分析----过滤器之ActiveLimitFilt

Dubbo源码分析----过滤器之ActiveLimitFilt

作者: _六道木 | 来源:发表于2018-07-17 23:24 被阅读57次

    ActiveLimitFilter也是用来做并发控制的,区别在于ExecuteLimitFilter作用于服务端,而ActiveLimitFilter作用于客户端。
    看下官网的例子

    <dubbo:service interface="com.foo.BarService" actives="10" />
    

    即从客户端方面限制了服务最多有10个并发

    接下来看下ActiveLimitFilter的invoke方法

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            // 获取配置的数量
            int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
            // 获取当前接口调用统计
            RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            if (max > 0) {
                //获取接口超时时间
                long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
                long start = System.currentTimeMillis();
                long remain = timeout;
                int active = count.getActive();// 获取并发数
                if (active >= max) {// 如果大于最大数量
                    synchronized (count) {
                        while ((active = count.getActive()) >= max) {
                            try {
                                // 挂起当前线程并释放锁,因为并发数已超过限制
                                count.wait(remain);
                            } catch (InterruptedException e) {
                            }
                            // 通过notify唤醒了,计算挂起的时间
                            long elapsed = System.currentTimeMillis() - start;
                            remain = timeout - elapsed;
                            if (remain <= 0) {// 如果已经超过超时时间
                                throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                                       + invoker.getInterface().getName() + ", method: "
                                                       + invocation.getMethodName() + ", elapsed: " + elapsed
                                                       + ", timeout: " + timeout + ". concurrent invokes: " + active
                                                       + ". max concurrent invoke limit: " + max);
                            }
                        }
                    }
                }
            }
            try {
                long begin = System.currentTimeMillis();
                // 增加该方法的数量
                RpcStatus.beginCount(url, methodName);
                try {
                    // 调用
                    Result result = invoker.invoke(invocation);
                    // 减少数量
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                    return result;
                } catch (RuntimeException t) {
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                    throw t;
                }
            } finally {
                if(max>0){// 调用完成后调用notify唤醒在等待的线程
                    synchronized (count) {
                        count.notify();
                    } 
                }
            }
        }
    

    整个逻辑也是比较简单的,和ExecuteLimitFilter不太一样的是,ExecuteLimitFilter在超过限制的数量后会直接返回,而ActiveLimitFilter会等待,直到超时。

    相关文章

      网友评论

          本文标题:Dubbo源码分析----过滤器之ActiveLimitFilt

          本文链接:https://www.haomeiwen.com/subject/xjrnpftx.html