美文网首页大数据
springboot线程池简单监控

springboot线程池简单监控

作者: _Kantin | 来源:发表于2021-12-17 16:50 被阅读0次

    背景

    • 在我们实际项目开发中,常常会为不同的优先级的任务设置相对应的线程池。
    • 一般我们只关注相关池的相关参数如核心线程数据,最大线程数据等等参数,容易忽略了对线程池中实际运行情况的监控。
    • 综上所述:线程池如果相当于黑盒一样在运行的话,对系统的不利的。本文提供了一种简单获取线程池运行状态的方式,可以将详情打印到日志或者对接到Prometheus上进行展示。
    • 详细有不少博主给出了动态修改线程的方式,但是由于生产环境是禁止,因此本文只提供了监控的功能。
    • 本代码应用项目架构为springboot。

    代码

    代码类结构
    • ThreadPoolMonitor:线程池扩展类
    • ThreadPoolUtil:线程池工具类
    • ThreadPoolDetailInfo:bean类
    • ExecutorThreadPoolManager:线程池实现类
    • ThreadPoolController:线程池测试方法
    线程池扩展类
    • 从类主要重写了ThreadPoolExecutor类中的shutdown/shutdownNow/beforeExecute/afterExecute,用于对每个任务进行执行前后的拦截,计算出每个任务的运行时间。
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.*;
    /**
     * @ClassName ThreadPoolMonitor
     * @authors kantlin
     * @Date 2021/12/16 17:45
     **/
    public class ThreadPoolMonitor extends ThreadPoolExecutor {
        private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);
        private final ConcurrentHashMap<String, Date> startTimes;
        private final String poolName;
        private long totalDiff;
    
        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            this.startTimes = new ConcurrentHashMap();
            this.poolName = poolName;
        }
    
        @Override
        public void shutdown() {
            LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()});
            super.shutdown();
        }
        @Override
        public List<Runnable> shutdownNow() {
            LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()});
            return super.shutdownNow();
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            this.startTimes.put(String.valueOf(r.hashCode()), new Date());
        }
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            Date startDate = this.startTimes.remove(String.valueOf(r.hashCode()));
            Date finishDate = new Date();
            long diff = finishDate.getTime() - startDate.getTime();
            this.totalDiff += diff;
        }
    
        public long getTotalDiff() {
            return this.totalDiff;
        }
    }
    
    线程工具类
    import org.springframework.stereotype.Component;
    import java.util.HashMap;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @ClassName ThreadPoolUtil
     * @authors kantlin
     * @Date 2021/12/16 17:45
     **/
    
    @Component
    public class ThreadPoolUtil {
        private final HashMap<String, ThreadPoolMonitor> threadPoolExecutorHashMap = new HashMap();
    
        public ThreadPoolUtil() {
        }
    
        public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,String poolName) {
            ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, poolName);
            this.threadPoolExecutorHashMap.put(poolName, threadPoolExecutor);
            return threadPoolExecutor;
        }
    
        public HashMap<String, ThreadPoolMonitor> getThreadPoolExecutorHashMap() {
            return this.threadPoolExecutorHashMap;
        }
    
    线程bean类
    import lombok.Data;
    
    @Data
    public class ThreadPoolDetailInfo {
        //线程池名字
        private String threadPoolName;
        //当前线程池大小
        private Integer poolSize;
        //线程池核心线程数量
        private Integer corePoolSize;
        //线程池生命周期中最大线程数量
        private Integer largestPoolSize;
        //线程池中允许的最大线程数
        private Integer maximumPoolSize;
        //线程池完成的任务数目
        private long completedTaskCount;
        //线程池中当前活跃个数
        private Integer active;
        //线程池完成的任务个数
        private long task;
        //线程最大空闲时间
        private long keepAliveTime;
        //当前活跃线程的占比
        private int activePercent;
        //任务队列容量(阻塞队列)
        private Integer queueCapacity;
        //当前队列中任务的数量
        private Integer queueSize;
        //线程池中任务平均执行时长
        private long avgExecuteTime;
    
        public ThreadPoolDetailInfo(String threadPoolName, Integer poolSize, Integer corePoolSize, Integer largestPoolSize, Integer maximumPoolSize, long completedTaskCount, Integer active, long task, long keepAliveTime, int activePercent, Integer queueCapacity, Integer queueSize, long avgExecuteTime) {
            this.threadPoolName = threadPoolName;
            this.poolSize = poolSize;
            this.corePoolSize = corePoolSize;
            this.largestPoolSize = largestPoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.completedTaskCount = completedTaskCount;
            this.active = active;
            this.task = task;
            this.keepAliveTime = keepAliveTime;
            this.activePercent = activePercent;
            this.queueCapacity = queueCapacity;
            this.queueSize = queueSize;
            this.avgExecuteTime = avgExecuteTime;
        }
    }
    
    线程池实现类
    • 在我的项目中,将线程池依次划分为high、normal、low、single四种线程池类型。不同优先级的任务将会被submit到不同的线程池中执行。
    • 在业务中有判断线程池中queue的长度来决定是否投递任务,由于没有相应的拒绝策略,所以队列不设置长度。
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import com.*.newThread.ThreadPoolUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class ExecutorThreadPoolManager {
    
        @Autowired
        private ThreadPoolUtil threadPoolUtil;
    
        @Value("${thread_pool_normal_level_thread_max_num}")
        private Integer normalLevelThreadPoolThreadMaxNum;
        @Value("${thread_pool_normal_level_core_thread_num}")
        private Integer normalLevelThreadPoolCoreThreadNum;
        @Value("${thread_pool_low_level_thread_max_num}")
        private Integer lowLevelThreadPoolThreadMaxNum;
        @Value("${thread_pool_low_level_core_thread_num}")
        private Integer lowLevelThreadPoolCoreThreadNum;
    
        private ThreadPoolExecutor normalThreadPoolExecutor;
    
        private ThreadPoolExecutor highPriorityExecutor;
    
        private ThreadPoolExecutor lowPriorityExecutor;
    
        private ThreadPoolExecutor singleThreadPoolExecutor;
    
    
        @PostConstruct
        public void initExecutor() {
            ThreadFactory normalThreadFactory = new ThreadFactoryBuilder().setNameFormat("normal_task_thread_%d").build();
            normalThreadPoolExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), normalThreadFactory,"normal_level_thread_pool");
    
            ThreadFactory highPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("high_priority_level_task_thread_%d").build();
            highPriorityExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), highPriorityThreadFactory,"high_level_thread_pool");
    
            ThreadFactory lowPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("low_priority_level_task_thread_%d").build();
            lowPriorityExecutor = threadPoolUtil.creatThreadPool(lowLevelThreadPoolCoreThreadNum, lowLevelThreadPoolThreadMaxNum, 0L,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), lowPriorityThreadFactory,"low_level_thread_pool");
    
            ThreadFactory singleFactory = new ThreadFactoryBuilder().setNameFormat("single_task_thread_%d").build();
            singleThreadPoolExecutor =threadPoolUtil.creatThreadPool(1, 1,
                    0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), singleFactory,"single_level_thread_pool");
        }
    
        /**
         * @author kantlin
         * @date 2021/9/9
         * @describe 定义三种线程池, 一般采集类的用低优, 正常业务的用中优, 用户手动请求API的用高优线程池
         **/
        public ThreadPoolExecutor getNormalThreadPoolExecutor() {
            return normalThreadPoolExecutor;
        }
    
        public ThreadPoolExecutor getHighPriorityExecutor() {
            return highPriorityExecutor;
        }
    
        public ThreadPoolExecutor getLowPriorityExecutor() {
            return lowPriorityExecutor;
        }
    
        public ThreadPoolExecutor getSingleThreadPoolExecutor() {
            return singleThreadPoolExecutor;
        }
    
    }
    
    线程池监控接口类
    import com.alibaba.fastjson.JSONObject;
    import com.*.newThread.ThreadPoolDetailInfo;
    import com.*.newThread.ThreadPoolMonitor;
    import com.*.newThread.ThreadPoolUtil;
    import com.*.thread.ExecutorThreadPoolManager;
    import io.swagger.annotations.Api;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.web.bind.annotation.*;
    import java.math.BigDecimal;
    import java.text.NumberFormat;
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @ClassName ThreadPoolController
     * @authors kantlin
     * @Date 2021/12/17 14:53
     **/
    @Api(description = "线程池监控接口")
    @RestController
    @RequestMapping(value = "api/threadpool")
    public class ThreadPoolController {
        private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolController.class);
    
        @Autowired
        private ExecutorThreadPoolManager threadPool;
    
        @Autowired
        private ThreadPoolUtil threadPoolUtil;
    
        @GetMapping(value = "/getThreadPools")
        private List<String> getThreadPools() {
            List<String> threadPools = new ArrayList();
            if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) {
                Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator();
    
                while (var2.hasNext()) {
                    Map.Entry<String, ThreadPoolMonitor> entry = (Map.Entry) var2.next();
                    threadPools.add(entry.getKey());
                }
            }
    
            return threadPools;
        }
    
        @GetMapping(value = "/getThreadPoolListInfo")
        @Scheduled(cron = "${thread.poll.status.cron}")
        private List<ThreadPoolDetailInfo> getThreadPoolListInfo() {
            List<ThreadPoolDetailInfo> detailInfoList = new ArrayList();
            if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) {
                Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator();
                while (var2.hasNext()) {
                    Map.Entry<String, ThreadPoolMonitor> entry = (Map.Entry) var2.next();
                    ThreadPoolDetailInfo threadPoolDetailInfo = this.threadPoolInfo(entry.getValue(), (String) entry.getKey());
                    detailInfoList.add(threadPoolDetailInfo);
                }
            }
            LOGGER.info("Execute details of cuurent thread poll:{}", JSONObject.toJSONString(detailInfoList));
            return detailInfoList;
        }
    
        private ThreadPoolDetailInfo threadPoolInfo(ThreadPoolMonitor threadPool, String threadPoolName) {
            BigDecimal activeCount = new BigDecimal(threadPool.getActiveCount());
            BigDecimal maximumPoolSize = new BigDecimal(threadPool.getMaximumPoolSize());
            BigDecimal result = activeCount.divide(maximumPoolSize, 2, 4);
            NumberFormat numberFormat = NumberFormat.getPercentInstance();
            numberFormat.setMaximumFractionDigits(2);
            int queueCapacity = 0;
            return new ThreadPoolDetailInfo(threadPoolName, threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getLargestPoolSize(), threadPool.getMaximumPoolSize(), threadPool.getCompletedTaskCount(), threadPool.getActiveCount(), threadPool.getTaskCount(), threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS), new Double(result.doubleValue() * 100).intValue(), queueCapacity, threadPool.getQueue().size(), threadPool.getTaskCount() == 0L ? 0L : threadPool.getTotalDiff() / threadPool.getTaskCount());
        }
    }
    

    运行结果

    • 上面controller中的方法除了可以通过接口进行暴露外,还设置了定时任务定期的打印到日志中。方便对系统状态进行排查。
    [
      {
        "active": 0,
        "activePercent": 0,
        "avgExecuteTime": 0,
        "completedTaskCount": 0,
        "corePoolSize": 20,
        "keepAliveTime": 0,
        "largestPoolSize": 0,
        "maximumPoolSize": 20,
        "poolSize": 0,
        "queueCapacity": 0,
        "queueSize": 0,
        "task": 0,
        "threadPoolName": "high_level_thread_pool"
      },
      {
        "active": 0,
        "activePercent": 0,
        "avgExecuteTime": 0,
        "completedTaskCount": 0,
        "corePoolSize": 33,
        "keepAliveTime": 0,
        "largestPoolSize": 0,
        "maximumPoolSize": 33,
        "poolSize": 0,
        "queueCapacity": 0,
        "queueSize": 0,
        "task": 0,
        "threadPoolName": "low_level_thread_pool"
      },
      {
        "active": 0,
        "activePercent": 0,
        "avgExecuteTime": 371,
        "completedTaskCount": 14,
        "corePoolSize": 20,
        "keepAliveTime": 0,
        "largestPoolSize": 14,
        "maximumPoolSize": 20,
        "poolSize": 14,
        "queueCapacity": 0,
        "queueSize": 0,
        "task": 14,
        "threadPoolName": "normal_level_thread_pool"
      },
      {
        "active": 0,
        "activePercent": 0,
        "avgExecuteTime": 0,
        "completedTaskCount": 0,
        "corePoolSize": 1,
        "keepAliveTime": 0,
        "largestPoolSize": 0,
        "maximumPoolSize": 1,
        "poolSize": 0,
        "queueCapacity": 0,
        "queueSize": 0,
        "task": 0,
        "threadPoolName": "single_level_thread_pool"
      }
    ]
    

    相关文章

      网友评论

        本文标题:springboot线程池简单监控

        本文链接:https://www.haomeiwen.com/subject/cimifrtx.html