美文网首页程序员
Dubbo 监控中心

Dubbo 监控中心

作者: 此鱼不得水 | 来源:发表于2018-01-29 17:41 被阅读506次

Dubbo 监控中心

Dubbo提供了一个简陋的监控中心可以供开发着观察相关的服务情况,一般小公司使用的异化已经可以满足,但是在大公司中业务比较复杂,往往不能够满足需求,因此需要自己定制开发(我司就是自己定制的一套)。其实Dubbo的监控并不属于Dubbo的核心业务,之所以讲解的原因也是为了满足自己学习的私心,因为自己也不了解监控中心一般要怎么搭建,所以就借此机会一边学习一边分享吧。

作为业务监控,一般要监控的数据就是consumer端的调用时间,provider端的相应时间,并发量和rt等等。这些数据的采集不仅需要在接口调用之后进行采集,还要在接口调用之前进行采集。因为监控并不是所有的应用都需要,所以Dubbo的监控中心是可选的,基于Dubbo现有的架构的话Filter最适合做这一类的工作了,而且Dubbo也是这么来做的。下面就简单看一下Filter的具体实现:

    // 调用过程拦截,简单记录一下调用前的信息,然后执行数据采集
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //只有有monitor参数的时候才会进行数据采集
        if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
            // 提供方必须在invoke()之前获取context信息
            RpcContext context = RpcContext.getContext();  
            // 记录起始时间戮
            long start = System.currentTimeMillis(); 
            // 单机并发加(统计并发量)
            getConcurrent(invoker, invocation).incrementAndGet();           
            try {
                //让调用链往下执行
                Result result = invoker.invoke(invocation);            
                collect(invoker, invocation, result, context, start, false);
                return result;
            } catch (RpcException e) {
                collect(invoker, invocation, null, context, start, true);
                throw e;
            } finally {
                getConcurrent(invoker, invocation).decrementAndGet(); // 并发减
            }
        } else {
            return invoker.invoke(invocation);
        }
    }
    
    
    //根据上面传的参数进行数据采集,采集之后将数据汇总然后发往monitor
    private void collect(Invoker<?> invoker, Invocation invocation, Result result, RpcContext context, long start, boolean error) {
        try {
            // ---- 服务信息获取 ----
            long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
            int concurrent = getConcurrent(invoker, invocation).get(); // 当前单机并发数
            String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
            String service = invoker.getInterface().getName(); // 获取服务名称
            String method = RpcUtils.getMethodName(invocation); // 获取方法名
            URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
            Monitor monitor = monitorFactory.getMonitor(url);//默认采DubboMonitor
            int localPort;
            String remoteKey;
            String remoteValue;
            if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
                // ---- 服务消费方监控 ----
                context = RpcContext.getContext(); // 消费方必须在invoke()之后获取context信息
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- 服务提供方监控 ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = context.getRemoteHost();
            }
            //input代表收到的request数据大小,output代表请求返回结果的数据大小
            String input = "", output = "";
            if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
                input = invocation.getAttachment(Constants.INPUT_KEY);
            }
            if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
                output = result.getAttachment(Constants.OUTPUT_KEY);
            }
            monitor.collect(new URL(Constants.COUNT_PROTOCOL,
                                NetUtils.getLocalHost(), localPort,
                                service + "/" + method,
                                MonitorService.APPLICATION, application,
                                MonitorService.INTERFACE, service,
                                MonitorService.METHOD, method,
                                remoteKey, remoteValue,
                                error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",
                                MonitorService.ELAPSED, String.valueOf(elapsed),
                                MonitorService.CONCURRENT, String.valueOf(concurrent),
                                Constants.INPUT_KEY, input,
                                Constants.OUTPUT_KEY, output));
        } catch (Throwable t) {
            logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
        }
    }

Dubbo采集的数据主要有以下几项:

  • 总的成功次数
  • 总的失败次数
  • 总的request数据大小
  • 总的response数据大小
  • 调用总耗时
  • 并发量
  • 最大request数据大小
  • 最大response数据大小
  • 最大耗时
  • 最大并发量

DubboMonitor负责将Filter中采集的数据存储起来(statisticsMap),然后将数据发往monitorService,最终由monitorService将收到的数据进行整理和展示。

    public SimpleMonitorService() {
        //收集数据的队列
        queue = new LinkedBlockingQueue<URL>(Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000")));
        writeThread = new Thread(new Runnable() {
            public void run() {
                while (running) {
                    try {
                        write(); // 记录统计日志
                    } catch (Throwable t) { // 防御性容错
                        logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t);
                        try {
                            Thread.sleep(5000); // 失败延迟
                        } catch (Throwable t2) {
                        }
                    }
                }
            }
        });
        writeThread.setDaemon(true);
        writeThread.setName("DubboMonitorAsyncWriteLogThread");
        writeThread.start();
        chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    draw(); // 绘制图表
                } catch (Throwable t) { // 防御性容错
                    logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t);
                }
            }
        }, 1, 300, TimeUnit.SECONDS);
        INSTANCE = this;
    }

write()方法就是利用LinkedBlockingQueue自有的锁机制从其中把数据分类然后刷新到指定的文件中,当没有采集的数据时候write()会阻塞,直到有数据来为止。而draw方法就是根据wirte()方法产生的数据文件,定期的扫描然后生成图表。
个人认为write和draw方法都是比较清楚的方法,里面没有涉及比较重要的内容,所以具体的代码就不展示了。


从上面的介绍中我们可以看出来的是Dubbo将采集的数据信息都通过一个队列来存储,因为单机的问题,所以本质上也是有一定的容量限制的。Dubbo目前设置的队列最大长度为100000(10W),但是生成数据的数据如果超过了Dubbo写文件的速度的话就会产生数据积压,而且积压会随着时间越来越严重,最终可能就会遗漏一部分采集数据,导致整个采集过程不够准确。
所以,如果你的应用访问量比较大,请自行定制Dubbo的监控中心。

相关文章

网友评论

    本文标题:Dubbo 监控中心

    本文链接:https://www.haomeiwen.com/subject/ofhfzxtx.html