一.ThreadPoolExecutor的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
- corePoolSize 程池中的核心线程数,也就是是线程池中的最小线程数;
- maximumPoolSize 最大线程池大小
- keepAliveTime 线程池中超过corePoolSize数目的非核心线程最大保活时间
- 参数的时间单位
- 执行前用于保持任务的队列,也就是线程池的缓存队列
- 线程工厂,为线程池提供创建线程的功能,它只是一个接口,只有一个方法::Thread newThread(Runnable r)
- RejectedExecutionHandler 线程池对拒绝任务的处理策略。一般是队列已满或者无法成功执行任务,这时ThreadPoolExecutor会调用handler的rejectedExecution方法来通知调用者
二.ThreadPoolExecutor执行过程
假设corePoolSize为2,当有线程时,默认先开始2个线程,为其分配空间,当线程数超过2时,新任务将放到队列中,等待线程池中任务调度执行,当提交的任务数超过(max+队列长度)时,会产生异常
三.定制自己的线程池
public class Test1 {
private static final int CORE_POOLSIZE = 2;
private static final int MAXIMUM_POOLSIZE = 4;
private static final int WORKQUEUE_SIZE = 10;
public static void main(String arg[]) {
final ArrayBlockingQueue queue = new ArrayBlockingQueue<Runnable>(WORKQUEUE_SIZE);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOLSIZE, MAXIMUM_POOLSIZE, 30, TimeUnit.SECONDS, queue);
for (int i = 0; i < 2; i++) {
int index = i;
final int finalI = i;
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("index==" + finalI + " workQueue的长度 " + queue.size());
}
});
}
}
}
输出:index==1 workQueue的长度 0
index==0 workQueue的长度 0(开启的两个线程够用,不用队列)
public class Test1 {
private static final int CORE_POOLSIZE = 2;
private static final int MAXIMUM_POOLSIZE = 4;
private static final int WORKQUEUE_SIZE = 10;
public static void main(String arg[]) {
final ArrayBlockingQueue queue = new ArrayBlockingQueue<Runnable>(WORKQUEUE_SIZE);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOLSIZE, MAXIMUM_POOLSIZE, 30, TimeUnit.SECONDS, queue);
for (int i = 0; i < 5; i++) {
int index = i;
final int finalI = i;
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("index==" + finalI + " workQueue的长度 " + queue.size());
}
});
}
}
}
输出:index==0 workQueue的长度 3
index==1 workQueue的长度 3
index==2 workQueue的长度 1
index==3 workQueue的长度 1
index==4 workQueue的长度 0
public class Test1 {
private static final int CORE_POOLSIZE = 2;
private static final int MAXIMUM_POOLSIZE = 4;
private static final int WORKQUEUE_SIZE = 10;
public static void main(String arg[]) {
final ArrayBlockingQueue queue = new ArrayBlockingQueue<Runnable>(WORKQUEUE_SIZE);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOLSIZE, MAXIMUM_POOLSIZE, 30, TimeUnit.SECONDS, queue);
for (int i = 0; i < 15; i++) {
int index = i;
final int finalI = i;
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("index==" + finalI + " workQueue的长度 " + queue.size());
}
});
}
}
}
输出:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.wzh.myhttp.write.Test1$1@29453f44 rejected from java.util.concurrent.ThreadPoolExecutor@5cad8086[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.wzh.myhttp.write.Test1.main(Test1.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
这个时候需要用到RejectedExecutionHandler,
ThreadPoolExecutor默认有四个拒绝策略:
- ThreadPoolExecutor.AbortPolicy() 直接抛出异常RejectedExecutionException
- ThreadPoolExecutor.CallerRunsPolicy() 直接调用run方法并且阻塞执行
- ThreadPoolExecutor.DiscardPolicy() 直接丢弃后来的任务
- ThreadPoolExecutor.DiscardOldestPolicy() 丢弃在队列中队首的任务
-
还可以自己自定义拒绝策略
CallerRunsPolicy.png
四.线程的中断
关于线程的中断,好多人都会想到用thred.interrupt()
public class Test2 {
public static void main(String[] args) throws InterruptedException {
MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable);
thread.start();
Thread.sleep(1000);
// runnable.flag = false;
thread.interrupt();
}
static class MyRunnable implements Runnable {
public volatile boolean flag = true;
@Override
public void run() {
while ( !Thread.interrupted()) {
try {
System.out.println("running....");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
打印的结果 :
中断产生异常后依旧在运行
running....
running....
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.wzh.myhttp.write.Test2$MyRunnable.run(Test2.java:26)
at java.lang.Thread.run(Thread.java:745)
running....
running....
running....
Interrupted.png
在产生异常的时候,进行return处理,才能终止线程。
网友评论