美文网首页DubboDubbo 源码学习
Dubbo Monitor 源码学习(九)

Dubbo Monitor 源码学习(九)

作者: jwfy | 来源:发表于2018-06-03 13:52 被阅读19次

    笔记简述
    dubbo 的monitor是一个监控服务的模块,用来统计服务的调用次数以及调用的健康情况。接下来来学习monitor的实现原理
    更多内容可看[目录]Dubbo 源码学习

    目录

    Dubbo Monitor 源码学习(九)
    1、Filter入口
    2、Monitor监控 & 收集
    3、总结

    1、Filter入口

    作为服务提供方,monitor肯定是在invoker.invoker的时候判断是否存在monitor监听者从而判断是否进行统计情况。根据之前学习的内容,直接定位到ProtocolFilterWrapper类

    ProtocolFilterWrapper 类

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        // 如果url的协议不是注册协议(一般情况是dubbo)
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
        // 注意这个里面的Constants.PROVIDER,意味着是服务提供方,后面会具体说明的
    }
    
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        // 同上,只是是服务调用方
    }
    
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        // 获取到合适的Filter链,其中包含了invoker对象
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                 // 这个循环就好像是形成了一个链条,每一个对象都是一个filter,这样就可以利用这些filter完成特定的需求了
                 // 同时,我们此小节说的monitor其实也是一个filter罢了
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
    
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
    
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
    
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
    
                    public Result invoke(Invocation invocation) throws RpcException {
                        // filter的invoke调用
                        return filter.invoke(next, invocation);
                    }
    
                    public void destroy() {
                        invoker.destroy();
                    }
    
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
    

    当处理的url不是注册协议时,服务提供方和服务调用方都会去调用buildInvokerChain去构建一个调用链,调用链是根据dubbo spi获取到的合适的Filter对象串起来的,其中提供方和调用方就是根据Constants.CONSUMER以及Constants.PROVIDER选择合适的filter的,例如下面几个类的截图

    image
    image
    image

    很明显MonitorFilter类均会被服务提供方和服务调用方发现并使用,这里就是MonitorFilter正式进入他自己处理数据的范畴了

    如下图就是生产的Filter链条,高亮的地方就是MonitorFilter


    image

    2、Monitor监控 & 收集

    MonitorFilter 类

    // 调用过程拦截
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
            // 如果url参数中配置了monitor参数,并且有效
            RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
            long start = System.currentTimeMillis(); // 记录起始时间戮
            getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数
            // 这个是统计className+mathodName的调用次数,并存储在
            // ConcurrentMap<String, AtomicInteger> concurrents 集合中
            try {
                Result result = invoker.invoke(invocation); // 让调用链往下执行
                collect(invoker, invocation, result, context, start, false);
                // 收集数据
                return result;
            } catch (RpcException e) {
                collect(invoker, invocation, null, context, start, true);
                throw e;
            } finally {
                getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数
            }
        } else {
            return invoker.invoke(invocation);
            // 没有设置monitor,则直接进行下一个filter的invoker调用
        }
    }
    

    就是通过该方法完成对请求的监控作用,当监控完成之后就是collect收集

    private void collect(Invoker<?> invoker, Invocation invocation, Result result, RpcContext context, long start, boolean error) {
       // error字段表示了是否出现错误,在监听的catch代码块中该参数填充的true,意味着调用出现异常
        try {
            // ---- 服务信息获取 ----
            long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
            int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数
            String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);  // 应用民称
            String service = invoker.getInterface().getName(); // 获取服务名称
            String method = RpcUtils.getMethodName(invocation); // 获取方法名
            URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
            Monitor monitor = monitorFactory.getMonitor(url);
            // 利用静态工厂方法创建一个monitor,一般情况下是DubboMonitor对象
            int localPort;
            String remoteKey;
            String remoteValue;
            if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
                // ---- 服务消费方监控 ----
                context = RpcContext.getContext(); // 消费方必须在invoke()之后获取context信息
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- 服务提供方监控 ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = context.getRemoteHost();
            }
            String input = "", output = "";
            if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
                input = invocation.getAttachment(Constants.INPUT_KEY);
            }
            if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
                output = result.getAttachment(Constants.OUTPUT_KEY);
            }
            monitor.collect(new URL(Constants.COUNT_PROTOCOL,
                                NetUtils.getLocalHost(), localPort,
                                service + "/" + method,
                                MonitorService.APPLICATION, application,
                                MonitorService.INTERFACE, service,
                                MonitorService.METHOD, method,
                                remoteKey, remoteValue,
                                error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",
                                MonitorService.ELAPSED, String.valueOf(elapsed),
                                MonitorService.CONCURRENT, String.valueOf(concurrent),
                                Constants.INPUT_KEY, input,
                                Constants.OUTPUT_KEY, output));
            // 拼接的url数据是count://host/interface?application=foo&method=foo&provider=10.20.153.11:20880&
            // success=12&failure=2&elapsed=135423423
        } catch (Throwable t) {
            logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
        }
    }
    

    进行真正monitor.collect操作的是DubboMonitor类中,在进行collect调用的时候,会把数据存在ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap对象中,DubboMonitor本身自带的scheduledExecutorService定时任务,1分钟处理一下收集到的统计情况调用MonitorService的collect方法。最后可以使用MonitorService的query方法查询到具体的统计结果

    这个MonitorService类在dubbo本身是getProxy动态生成的了,在dubbo-ops中提供了一个SimpleMonitorService方法可以看看其具体的如何展示数据的

    3、总结

    总的来说,monitor本身的原理还是比较简单的,利用filter完成对请求的监控,后续利用定时任务定时采集监控的数据,后续利用MonitorService实现对数据的收集和监听

    同时,我们也可以添加自定义的Filter以及特定的MonitorService完成自身特定的需求等

    相关文章

      网友评论

        本文标题:Dubbo Monitor 源码学习(九)

        本文链接:https://www.haomeiwen.com/subject/nesjsftx.html