美文网首页
JMX可视化监控线程池

JMX可视化监控线程池

作者: 何甜甜在吗 | 来源:发表于2019-10-22 14:27 被阅读0次

    前两天阅读公司代码看到了用JMX监控定时任务信息和状态,JMX这个单词感觉很熟于是便去查阅了一下,并写了监控线程池的Demo

    通过阅读本篇文章你将了解到:

    • JMX介绍
    • 线程池介绍
    • JMX监控线程池应用

    什么是JMX

    JMX简介

    JMX(Java Management Extensions),监控管理框架,通过使用JMX可以监控和管理应用程序。JMX最常见的场景是监控Java程序的基本信息和运行情况,任何Java程序都可以开启JMX,然后使用JConsoleVisual VM进行预览

    JMX架构
    JMX架构图
    总共分为三层,分发层、代理层、设备层
    分发层:根据不同的协议定义了对代理层进行各种操作的管理接口,简单的来说是监控指标的查看方式,可以是HTTP连接、RMI连接、SNMP连接
    代理层:管理MBean,通过将MBean注册到代理层实现MBean的管理,除了注册MBean,还可以注册Adapter,代理层在应用中一般都是MBeanService

    设备层:监控指标抽象出的类,可以分为以下几种:

    • Standard MBean
    • Dynamic MBean
    • Open MBean
    • Model MBean
    • MXBean

    应用中一般使用Standard MBean比较多,所以这里只介绍Standard MBean,使用Standard MBean需要满足一定的规则,规则如下:

    • 定义一个接口,接口名必须为XXXXMBean的格式,必须MBean结尾
    • 如果接口为XXXXMBean,则接口实现类必须为MBean,否则程序将报错
    • 接口中通过getset方法表示监控指标是否可读、可写。比如getXXX()抽象方法,则XXX就是监控的指标,getXXX()表示XXX性能指标可读,setXXX()方法表示该监控指标可写
    • 参数和返回类型只能是简单的引用类型(如String)和基本数据类型,不可以是自定义类型,如果返回值为自定义类型可以选择MXBean

    线程池简单介绍

    线程池是线程的管理工具,通过使用线程池可以复用线程降低资源消耗、提高响应速度、提高线程的可管理性。如果在系统中大量使用线程池,就必须对线程池进行监控方便出错时定位问题。可以通过线程池提供的参数进行监控,线程池提供的参数如下:

    方法 含义
    getActiveCount 线程池中正在执行任务的线程数量
    getCompletedTaskCount 线程池已完成的任务数量
    getCorePoolSize 线程池的核心线程数量
    getLargestPoolSize 线程池曾经创建过的最大线程数量
    getMaximumPoolSize 线程池的最大线程数量
    getPoolSize 线程池当前的线程数量
    getTaskCount 线程池需要执行的任务数量

    应用

    介绍完JMX及线程池以后,写一个JMX监控线程池的Demo,总不能纸上谈兵吧

    • 定义线程池监控类:ThreadPoolMonitor.java
      public class ThreadPoolMonitor extends ThreadPoolExecutor {
          private final Logger logger = LoggerFactory.getLogger(getClass());
      
          /**
           * ActiveCount
           * */
          int ac = 0;
      
          /**
           * 当前所有线程消耗的时间
           * */
          private AtomicLong totalCostTime = new AtomicLong();
      
          /**
           * 当前执行的线程总数
           * */
          private AtomicLong totalTasks = new AtomicLong();
      
          /**
           * 线程池名称
           */
          private String poolName;
      
          /**
           * 最短 执行时间
           * */
          private long minCostTime;
      
          /**
           * 最长执行时间
           * */
          private long maxCostTime;
      
      
          /**
           * 保存任务开始执行的时间
           */
          private ThreadLocal<Long> startTime = new ThreadLocal<>();
      
          public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                 TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
              this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), poolName);
          }
      
          public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                 TimeUnit unit, BlockingQueue<Runnable> workQueue,
                                 ThreadFactory threadFactory, String poolName) {
              super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
              this.poolName = poolName;
          }
      
          public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
              return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
          }
      
          public static ExecutorService newCachedThreadPool(String poolName) {
              return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
          }
      
          public static ExecutorService newSingleThreadExecutor(String poolName) {
              return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
          }
      
          /**
           * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
           */
          @Override
          public void shutdown() {
              // 统计已执行任务、正在执行任务、未执行任务数量
              logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
              super.shutdown();
          }
      
          @Override
          public List<Runnable> shutdownNow() {
              // 统计已执行任务、正在执行任务、未执行任务数量
              logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
              return super.shutdownNow();
          }
      
          /**
           * 任务执行之前,记录任务开始时间
           */
          @Override
          protected void beforeExecute(Thread t, Runnable r) {
              startTime.set(System.currentTimeMillis());
          }
      
          /**
           * 任务执行之后,计算任务结束时间
           */
          @Override
          protected void afterExecute(Runnable r, Throwable t) {
              long costTime = System.currentTimeMillis() - startTime.get();
              startTime.remove();  //删除,避免占用太多内存
              //设置最大最小执行时间
              maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
              if (totalTasks.get() == 0) {
                  minCostTime = costTime;
              }
              minCostTime = minCostTime < costTime ? minCostTime : costTime;
              totalCostTime.addAndGet(costTime);
              totalTasks.incrementAndGet();
      
              logger.info("{}-pool-monitor: " +
                            "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " +
                            "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                            "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                      this.poolName,
                      costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(),
                      this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                      this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
          }
      
          public int getAc() {
              return ac;
          }
      
          /**
           * 线程平均耗时
           *
           * @return
           * */
          public float getAverageCostTime() {
              return totalCostTime.get() / totalTasks.get();
          }
      
          /**
           * 线程最大耗时
           * */
          public long getMaxCostTime() {
              return maxCostTime;
          }
      
          /**
           * 线程最小耗时
           * */
          public long getMinCostTime() {
              return minCostTime;
          }
      
          /**
           * 生成线程池所用的线程,改写了线程池默认的线程工厂
           */
          static class EventThreadFactory implements ThreadFactory {
              private static final AtomicInteger poolNumber = new AtomicInteger(1);
              private final ThreadGroup group;
              private final AtomicInteger threadNumber = new AtomicInteger(1);
              private final String namePrefix;
      
              /**
               * 初始化线程工厂
               *
               * @param poolName 线程池名称
               */
              EventThreadFactory(String poolName) {
                  SecurityManager s = System.getSecurityManager();
                  group = Objects.nonNull(s) ? s.getThreadGroup() :   Thread.currentThread().getThreadGroup();
                  namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
              }
      
              @Override
              public Thread newThread(Runnable r) {
                  Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
                  if (t.isDaemon())
                      t.setDaemon(false);
                  if (t.getPriority() != Thread.NORM_PRIORITY)
                      t.setPriority(Thread.NORM_PRIORITY);
                  return t;
              }
          }
      }
      

    通过继承线程池来自定义线程池,并在构造函数中加入了poolName标明是哪一个线程池,同时重写了beforeExecuteafterExecuteterminated等方法,在beforeExecute方法中记录线程池执行的时间,在afterExecute方法中计算线程执行的耗时、最大耗时、最小耗时、平均耗时。重写线程池生成线程的方法,指定了生成的线程名

    • 定义一个MBeanThreadPoolParamMBean.java
      MBean其实并不是一个实体类而是一个接口,里面定义了监控的指标

      public interface ThreadPoolParamMBean {
          /**
           * 线程池中正在执行任务的线程数量
           *
           * @return
           */
          int getActiveCount();
      
          /**
           * 线程池已完成的任务数量
           *
           * @return
           */
          long getCompletedTaskCount();
      
          /**
           * 线程池的核心线程数量
           *
           * @return
           */
          int getCorePoolSize();
      
          /**
           * 线程池曾经创建过的最大线程数量
           *
           * @return
           */
          int getLargestPoolSize();
      
          /**
           * 线程池的最大线程数量
           *
           * @return
           */
          int getMaximumPoolSize();
      
          /**
           * 线程池当前的线程数量
           *
           * @return
           */
          int getPoolSize();
      
          /**
           * 线程池需要执行的任务数量
           *
           * @return
           */
          long getTaskCount();
      
          /**
           * 线程最大耗时
           *
           * @return
           * */
          long getMaxCostTime();
      
          /**
           * 线程最小耗时
           *
           * @return
           * */
          long getMinCostTime();
      
          /**
           * 线程平均耗时
           *
           * @return
           * */
          float getAverageCostTime();
      }
      
    • 定义一个MBean实现类:ThreadPoolParam.java
      定义的是静态MBean,所以接口实现类必须满足规定,即xxxMBean,实现类为xxx

      public class ThreadPoolParam implements ThreadPoolParamMBean  {
          private ThreadPoolMonitor threadPoolMonitor;
      
          public ThreadPoolParam(ExecutorService es) {
              this.threadPoolMonitor = (ThreadPoolMonitor) es;
          }
      
          /**
           * 线程池中正在执行任务的线程数量
           *
           * @return
           */
          @Override
          public int getActiveCount() {
              return threadPoolMonitor.getAc();
          }
      
          /**
           * 线程池已完成的任务数量
           *
           * @return
           */
          @Override
          public long getCompletedTaskCount() {
              return threadPoolMonitor.getCompletedTaskCount();
          }
      
          /**
           * 线程池的核心线程数量
           *
           * @return
           */
          @Override
          public int getCorePoolSize() {
              return threadPoolMonitor.getCorePoolSize();
          }
      
          /**
           * 线程池曾经创建过的最大线程数量
           *
           * @return
           */
          @Override
          public int getLargestPoolSize() {
              return threadPoolMonitor.getLargestPoolSize();
          }
      
          /**
           * 线程池的最大线程数量
           *
           * @return
           */
          @Override
          public int getMaximumPoolSize() {
              return threadPoolMonitor.getMaximumPoolSize();
          }
      
          /**
           * 线程池当前的线程数量
           *
           * @return
           */
          @Override
          public int getPoolSize() {
              return threadPoolMonitor.getPoolSize();
          }
      
          /**
           * 线程池需要执行的任务数量
           *
           * @return
           */
          @Override
          public long getTaskCount() {
              return threadPoolMonitor.getTaskCount();
          }
      
          /**
           * 线程最大耗时
           *
           * @return
           * */
          @Override
          public long getMaxCostTime() {
              return threadPoolMonitor.getMaxCostTime();
          }
      
          /**
           * 线程最小耗时
           *
           * @return
           * */
          @Override
          public long getMinCostTime() {
              return threadPoolMonitor.getMinCostTime();
          }
      
          /**
           * 线程平均耗时
           *
           * @return
           * */
          @Override
          public float getAverageCostTime() {
              return threadPoolMonitor.getAverageCostTime();
          }
      }
      

      监控的参数指标通过线程池得到

    • 测试类:Test.java

      public class Test {
          private static Random random = new Random();
          public static void main(String[] args) throws MalformedObjectNameException,  InterruptedException {
              ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
              ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);
      
              ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
              ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);
      
              MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam"));
              MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam"));
      
              //http连接的方式查看监控任务
              HtmlAdaptor.start();
      
              executeTask(es1);
              executeTask(es2);
              Thread.sleep(1000 * 60 * 60);
          }
      
          private static void executeTask(ExecutorService es) {
              new Thread(() -> {
                  for (int i = 0; i < 10; i++) {
                      int temp = i;
                      es.submit(() -> {
                          //随机睡眠时间
                          try {
                              Thread.sleep(random.nextInt(60) * 1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println(temp);
                      });
                  }
              }).start();
          }
      }
      

      说明:

      • MBeanServerUtil.registerMBean()注册监控的类
      • HtmlAdaptor.start()开启HTTP连接的方式查看监控任务

      启动程序后打开http://localhost:8082/如下图:

      image.png

    点击test-pool-1下的type=threadPoolParam

    image.png

    通过刷新获取线程池最新的监控指标
    test-pool-1type=threadPoolParam这些属性是在ObjectName中定义的属性值

    总结

    使用JMX监控线程池只是JMX一个功能,本篇文章只是学以致用,更多有关JMX以及线程池的内容可以查阅其他资料。文章若有错误欢迎指正

    最后附:项目代码,欢迎forkstar,【划重点了】

    相关文章

      网友评论

          本文标题:JMX可视化监控线程池

          本文链接:https://www.haomeiwen.com/subject/mxngmctx.html