在java 中我们会一般要求创建线程必须使用线程池,因为这样可以避免资源消耗,通过重复利用已经创建的线程来降低线程创建和销毁所造成的消耗, 其次当任务到达时任务可以不用等到线程创建就立即执行,最后可以提高线程的管理性,管控线程,不让线程无限制创建.
线程池的创建
我们可以使用 ThreadPoolExecutor 创建线程池
这里我们不适用Executors 来创建线程池,是因为,Executors 使用默认参数不用自定义参数,
自己经常使用Executors提供的工厂方法创建线程池,所以忽略了线程池内部的实现,同时可以规避规避资源耗尽的风险.
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
milliseconds,runnableTaskQueue, handler);
-
corePoolSize(线程池的基本大小): 当提交一个任务到线程池时,线程
池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务
也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前创建
并启动所有基本线程。 -
runnableTaskQueue (任务队列):用于保存等待执行的任务的阻塞队列。
可以选择以下几个阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队
列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按
FIFO (先进先出) 排序元素,吞吐量通常要高于
ArrayBlockingQueue。静态工厂方法
Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必
须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,
吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法
Executors.newCachedThreadPool 使用了这个队列。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
-
maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。
如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建
新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就
没什么效果。 -
ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创
建出来的线程设置更有意义的名字。 -
RejectedExecutionHandler(饱和策略): 当队列和线程池都满了,默认 AbortPolicy:直接抛出异常,
还可设置 CallerRunsPolicy:只用调用者所在线程来运行任务。
o DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前
任务。
o DiscardPolicy:不处理,丢弃掉。
o 当然也可以根据应用场景需要来实现 RejectedExecutionHandler
接口自定义策略。如记录日志或持久化不能处理的任务。
向线程池提交任务
我们可以使用 execute 提交的任务,但是 execute 方法没有返回值,所以无法判
断任务是否被线程池执行成功。通过以下代码可知 execute 方法输入的任务是一
个 Runnable 类的实例。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
可以使用 submit 方法来提交任务,它会返回一个 future,那么我们可以
通过这个 future 来判断任务是否执行成功,通过 future 的 get 方法来获取返回
值, get 方法会阻塞住直到任务完成,而使用 get(long timeout, TimeUnit unit)
方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
2} finally {
// 关闭线程池
executor.shutdown();
}
线程池的关闭
调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池,
它们
的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断
线程,所以无法响应中断的任务可能永远无法终止。
shutdown 和 shutdownNow 的区别
shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或
暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状
态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
通常调用 shutdown 来关闭线程池,表示任务执行完关闭,如果任务不一定要执行完,
则可以调用 shutdownNow。
线程池执行过程
提交一个任务到线程池,首先判断基本线程池是否已满,没有满不管线程池中有没有空闲线程都创建一个新线程来执行任务, 判断工作队列是否已满,没满则将新提交的任务存储在工作队列, 满了, 判断整个线程池是否已满, 没满, 则创建一个新的工作线程来
执行任务,满了,则交给饱和策略来处理这个任务.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果线程数小于基本线程数,则创建线程并执行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队
列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最
大允许的线程数量,
则创建一个线程执行任务。
else if (!addIfUnderMaximumPoolSize(command))
//抛出 RejectedExecutionException 异常
reject(command); // is shutdown or saturated
}
}
工作线程: 线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执
行完任务后,还会无限循环获取工作队列里的任务来执行。
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
合理的配置线程池
任务的性质:CPU 密集型任务,IO 密集型任务和混合型任务。
任务的优先级:高,中和低。
任务的执行时间:长,中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
-
CPU 密集型任务, 配置尽
可能小的线程,如配置 Ncpu+1 个线程的线程池。 -
IO 密集型任务则由于线程并不
是一直在执行任务,则配置尽可能多的线程,如 2*Ncpu。 -
混合型的任务,如果
可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两
个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞
吐率,如果这两个任务执行时间相差太大,则没必要进行分解。
-
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以
让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到
队列里,那么优先级低的任务可能永远不能执行。 -
线程提交 SQL 建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设
大一点,比如几千.
线程池的监控
通过继承线程池并重写线程池的 beforeExecute,
afterExecute 和 terminated 方法,我们可以在任务执行前,执行后和线程池关
闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。
网友评论