一、常见的不同特性的线程池返回方法
1、newFixedThreadPool() :
这个方法返回一个固定大小的线程池,当新的任务提交,如果线程池中有空闲线程,那么使用线程池中的线程,如果没有,则将新的任务加到任务队列中去。
2、newSingleThreadExecutor():
这个方法只返回只有一个线程的线程池,如果有新的任务进来,那么就加入到任务队列中,按照先进先出的方式执行任务。
3、newCachedThreadPool():
这个方法返回的线程池,线程数量不一定,如果线程池有空闲,那么就复用线程池中的线程,如果没有空闲,那么就会新创建一个线程去执行任务, 任务执行完成后将此线程加入到线程池进行复用。
以上三种线程池的实际代码获取:
eg:
//获取一个可以缓存的线程池
ExecutorService threadpool = Executors.newCachedThreadPool();
也可以自定义:
ExecutorService exe = new ThreadPoolExecutor(100, 200, 0L, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>());
二、上面方法的内部实现
以上3种常用的线程池获取方法,他们的内部实现都是由 ThreadPoolExcutor 类有参构造方法实现:
eg: 以下为 Excutors 类下的方法
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可见其中含有非常多的参数,均来自于: ThreadPoolExcutor 类的有参构造
eg:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//略.....
}
三、几个核心参数进行介绍:
int corePoolSize;
线程池中核心线程数量
int maximumPoolSize;
线程池中允许存在的最大线程数量
long keepAliveTime;
当线程池线程数量超过核心线程时,多余的空闲线程的存活时间,即超过该时间的空闲线程就会被销毁。
TimeUnit unit;
keepAliveTime的单位
BlockingQueue<Runnable> workQueue;
任务队列,被提交但没有执行的任务,它是一个 BlockingQueue接口的对象,仅仅用于存放Runnable对象,根据队列功能划分,在ThreadPoolExcutor的构造函数中有以下几种BlockingQueue;
①SynchronousQueue:直接提交的队列,没有容量,不保存任务,如果没有空闲的线程,则尝试创建新的线程,如果线程数量达到了maximumPoolSize,则执行拒绝策略handler
:因此,使用此队列的时候,需要设置尽可能大的maximumPoolSize,否则很容易执行异常策略。(注意观察newCachedThreadPool()方法,其实就是采用了直接提交的队列)
②ArrayBlockingQueue:有界任务队列,所谓有界,那么就是指这个队列有容量限制,所以其构造方法得有一个最大容量的参数。在使用这个队列时,如果新的任务进来,此时如果线程池实际的线程数小于corePoolSize ,那么就会创建新的线程,如果大于corePoolSize 且小于 maximumPoolSize , 就将任务加入到队列中,如果队列满了,且不大于maximumPoolSize 的情况下,那么就会创建新的线程,知道达到maximumPoolSize ,执行拒绝策略。
③LinkedBlockingQueue:无界队列,与有界队列相比,当由新任务进来,只要线程数小于corePoolSize , 那么就创建新的线程执行任务,当达到corePoolSize 时,再有新的任务进来,那么就会加入到无界队列中,所以这种队列容易造成系统资源耗尽。
:newSingleThreadExecutor方法和newFixedThreadPool则使用了此种队列。
④PriorityBlockingQueue**:优先任务队列,顾名思义,可以控制任务的执行先后顺序。
**:使用自定义线程池的时候,根据实际情况而定(这里还有待深思和实践)
ThreadFactory threadFactory;
线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler;
拒绝策略,当任务太多来不及处理时如何拒绝任务
JDK内置的拒绝策略如下:
①AbrtPolicy:该策略直接抛出异常,阻止系统正常工作
②CalerRunsPolocy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
③DiscardOledestPolicy:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
④DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理。
*:以上内置的策略均实现了RejectedExecutionHandler接口。
四、实例(使用优先队列自定义线程池使用)
package ThreadTest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThread implements Runnable, Comparable<MyThread> {
protected String name;
public MyThread(String name) {
this.name = name;
}
@Override
public int compareTo(MyThread o) {
int me = Integer.parseInt(this.name.split("_")[1]);
int other = Integer.parseInt(o.name.split("_")[1]);
if (me > other) {
return 1;
} else if (me < other) {
return -1;
} else {
return 0;
}
}
@Override
public void run() {
try {
System.out.println(this.name);
Thread.sleep(100); // 模拟工作任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String a[]) {
// 任务队列选用PriorityBlockingQueue 优先队列
ExecutorService exe = new ThreadPoolExecutor(100, 200, 0L, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>());
// 模拟1000个任务
for (int i = 0; i < 1000; i++) {
exe.execute(new MyThread("testThreadPoolExcutor_" + Integer.toString(999 - i)));
}
}
}
执行结果:
//前面900多是因为线程池中有空闲线程,所以直接执行。
五、优化线程池
(一)最优的线程池数量选取:
一般来说,线程池大小取决于CPU数量、内存大小、JDBC连接这些因素。有一个公式可以估算线程池大小:
Nthreads = Ncpu * Ucpu * (1 + W/C);
其中:
①Ncpu代表:CPU的数量(在java中可通过:Runtime.getRuntime().availableProcessors()方法得到)
②Ucpu代表:目标cpu的使用率
③W/C = 等待时间与计算时间的比率
(二)扩展的线程池
ThreadPoolExcutor是一个可以扩展的线程池,提供了:beforeExecute()、afterExecute()和terminated()3个接口对线程池进行控制:
例如实现一个带有日志输出功能的线程池,该线程池会在任务执行前后打印输出线程ID和任务ID:
package ThreadTest;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
// 重写任务执行前和执行后两种方法
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("beforeExecute MyThread Name :" + ((MyThread) r).getName() + " TID : " + t.getId());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("afterExecute TID : " +Thread.currentThread().getId());
System.out.println("afterExcute PoolSize :" + this.getPoolSize());
}
}
持续学习中......
网友评论