线程池构造方法有几个重要参数:
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//当线程数大于核心线程数时,空闲线程存活时间
TimeUnit unit,//空闲时间单位
BlockingQueue<Runnable> workQueue//任务大于线程池数量时,用于保存任务的队列
) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
当线程池核心数量不够时,新加入的任务会被存放在队列中,如果队列存满了,线程池会创建更多的线程,直到maximumPoolSize。如果还不足以处理新的任务,则面临一个丢弃策略,默认的丢弃策略是抛异常!
常用的Executors.newCachedThreadPool()和Executors.newFixedThreadPool(n),它的队列都是Integer.MAX_VALUE,所以maximumPoolSize和keepAliveTime参数就没有意义了。
如果要自己实现一个线程自动扩容方案呢?以下代码供大家测试探讨
/**
* Created on 2018/1/26 12:55
* <p>
* Description: [线程池自动扩容]
* <p>
* Company: [xxx]
*
* @author [aichiyu]
*/
public class TestAutoAdjustThreadPool {
/**
* 队列阈值,超过此值则扩大线程池
*/
private static final int MAX_QUEUE_SIZE = 100;
/**
* 每次扩容自动增加线程数
*/
private static final int PER_ADD_THREAD = 10;
/**
* 监控积压时间频率
*/
private static final int MONITOR_DELAY_TIME = 1;
private ScheduledExecutorService scheduledExecutorService ;
private ThreadPoolExecutor executor ;
public void start(){
executor =new ThreadPoolExecutor(10, 100,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
scheduledExecutorService = new ScheduledThreadPoolExecutor(10, new BasicThreadFactory.Builder().namingPattern("mq-monitor-schedule-pool-%d").daemon(true).build());
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println("当前线程池状态!"+executor);
//当队列大小超过限制,且jvm内存使用率小于80%时扩容,防止无限制扩容
if(executor.getQueue().size() >= MAX_QUEUE_SIZE && executor.getPoolSize()< executor.getMaximumPoolSize() && getMemoryUsage()<0.8){
System.out.println("线程池扩容!"+executor);
executor.setCorePoolSize(executor.getPoolSize() + PER_ADD_THREAD);
}
//当队列大小小于限制的80%,线程池缩容
if(executor.getPoolSize() > 0 && executor.getQueue().size() < MAX_QUEUE_SIZE * 0.8 ){
System.out.println("线程池缩容!"+executor);
executor.setCorePoolSize(executor.getPoolSize() - PER_ADD_THREAD);
}
}, MONITOR_DELAY_TIME, MONITOR_DELAY_TIME, TimeUnit.SECONDS);
}
public void stop() throws InterruptedException {
executor.shutdown();
while (!executor.awaitTermination(1,TimeUnit.SECONDS)){
//等待线程池中任务执行完毕
}
scheduledExecutorService.shutdown();
}
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}
public Future<?> submit(Runnable task) {
return executor.submit(task);
}
/**
* 获取jvm内存使用率
* @return
*/
public static double getMemoryUsage() {
return (double) (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / Runtime.getRuntime().maxMemory();
}
public static void main(String[] args) throws InterruptedException {
TestThreadPoolAutoExpand pool = new TestThreadPoolAutoExpand();
pool.start();
for (int i = 0; i < 1000; i++) {
pool.submit(()->{
System.out.println(Thread.currentThread()+" execute!~~");
try {
Thread.sleep(1500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
pool.stop();
}
}
网友评论