美文网首页线程池Java 杂谈
java线程池自动扩容

java线程池自动扩容

作者: 爱吃鱼aichiyu | 来源:发表于2018-01-26 14:45 被阅读22次

    线程池构造方法有几个重要参数:

    public ThreadPoolExecutor(int corePoolSize,//核心线程数
                                  int maximumPoolSize,//最大线程数
                                  long keepAliveTime,//当线程数大于核心线程数时,空闲线程存活时间
                                  TimeUnit unit,//空闲时间单位
                                  BlockingQueue<Runnable> workQueue//任务大于线程池数量时,用于保存任务的队列
    ) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    

    当线程池核心数量不够时,新加入的任务会被存放在队列中,如果队列存满了,线程池会创建更多的线程,直到maximumPoolSize。如果还不足以处理新的任务,则面临一个丢弃策略,默认的丢弃策略是抛异常!
    常用的Executors.newCachedThreadPool()和Executors.newFixedThreadPool(n),它的队列都是Integer.MAX_VALUE,所以maximumPoolSize和keepAliveTime参数就没有意义了。
    如果要自己实现一个线程自动扩容方案呢?以下代码供大家测试探讨

    /**
     * Created on 2018/1/26 12:55
     * <p>
     * Description: [线程池自动扩容]
     * <p>
     * Company: [xxx]
     *
     * @author [aichiyu]
     */
    public class TestAutoAdjustThreadPool {
    
        /**
         * 队列阈值,超过此值则扩大线程池
         */
        private static final int MAX_QUEUE_SIZE = 100;
    
        /**
         * 每次扩容自动增加线程数
         */
        private static final int PER_ADD_THREAD = 10;
    
        /**
         * 监控积压时间频率
         */
        private static final int MONITOR_DELAY_TIME = 1;
    
        private ScheduledExecutorService scheduledExecutorService ;
    
        private ThreadPoolExecutor executor ;
    
    
        public void start(){
            executor =new ThreadPoolExecutor(10, 100,
                    60L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>());
            scheduledExecutorService = new ScheduledThreadPoolExecutor(10, new BasicThreadFactory.Builder().namingPattern("mq-monitor-schedule-pool-%d").daemon(true).build());
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                System.out.println("当前线程池状态!"+executor);
                //当队列大小超过限制,且jvm内存使用率小于80%时扩容,防止无限制扩容
                if(executor.getQueue().size() >= MAX_QUEUE_SIZE && executor.getPoolSize()< executor.getMaximumPoolSize() && getMemoryUsage()<0.8){
                    System.out.println("线程池扩容!"+executor);
                    executor.setCorePoolSize(executor.getPoolSize() + PER_ADD_THREAD);
                }
                //当队列大小小于限制的80%,线程池缩容
                if(executor.getPoolSize() > 0  && executor.getQueue().size() < MAX_QUEUE_SIZE * 0.8  ){
                    System.out.println("线程池缩容!"+executor);
                    executor.setCorePoolSize(executor.getPoolSize() - PER_ADD_THREAD);
                }
    
    
            }, MONITOR_DELAY_TIME, MONITOR_DELAY_TIME, TimeUnit.SECONDS);
        }
    
        public void stop() throws InterruptedException {
            executor.shutdown();
            while (!executor.awaitTermination(1,TimeUnit.SECONDS)){
                //等待线程池中任务执行完毕
            }
            scheduledExecutorService.shutdown();
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            return executor.submit(task);
        }
        public Future<?> submit(Runnable task) {
            return executor.submit(task);
        }
    
    
        /**
         * 获取jvm内存使用率
         * @return
         */
        public static double getMemoryUsage() {
            return (double) (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / Runtime.getRuntime().maxMemory();
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            TestThreadPoolAutoExpand pool = new TestThreadPoolAutoExpand();
            pool.start();
            for (int i = 0; i < 1000; i++) {
                pool.submit(()->{
                    System.out.println(Thread.currentThread()+" execute!~~");
                    try {
                        Thread.sleep(1500L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            pool.stop();
        }
    }
    

    相关文章

      网友评论

        本文标题:java线程池自动扩容

        本文链接:https://www.haomeiwen.com/subject/bioiaxtx.html