Java线程池基本用法
Java提供了一些通用接口来创建线程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
但是通常不推荐使用这些简易接口,因为这些接口可能会使用无界的任务队列,理论上可以无限添加任务到线程池,并且对核心线程数和最大线程数的设置也可能不合理,导致对系统资源的消耗很大。
通常建议自己创建一个线程池对象,以ThreadPoolExecutor为例,看看如何构造一个线程池
线程池的创建
构造函数有几个参数比较重要:
- corePoolSize 核心线程数,线程池一般情况下不会回收的线程数量;
- maximumPoolSize 最大线程数,超过这个数量后不允许再创建线程;
- keepAliveTime 非核心线程处于空闲状态的最大时间,超过这个时间就会被回收;
- unit 时间单位,跟keepAliveTime结合使用;
- workQueue 任务队列,通常有SynchronousQueue,LinkedBlockingQueue几种类型;
- threadFactory 指定创建线程的工厂方法;
- handler 拒绝策略,当任务队列已满或线程数达到最大时执行。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池的生命周期
以ThreadPoolExecutor为例,简单看下代码结构和生命周期
Java线程池 (1).png
线程池有5个状态:
- RUNNING : 接受新的任务并处理队列中的任务;
- SHUTDOWN : 不再接受新任务,但会继续处理队列中未完成任务;
- STOP : 不再接受新任务,也不再处理队列中的任务,正在执行的任务被中断;
- TIDYING : 所有的任务处理完成,有效的线程数是0,下一步会执行terminated()进入TERMINATED;
- TERMINATED : terminated()方法执行完毕,线程池结束。
新任务的执行 execute()
一个新任务的执行分4步:
- 如果当前线程数小于核心线程数corePoolSize,就创建新的线程;
- 当前线程数大于核心线程数,且任务队列未满,将任务放入任务队列;
- 当前任务队列已满,且线程数小于最大线程数,就创建线程;
- 当前线程数大于最大线程数maximumPoolSize,拒绝执行任务。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
拒绝策略
Java提供了4种拒绝任务的策略,都是RejectedExecutionHandler的实现。
- AbortPolicy:抛出RejectedExecutionException;
- DiscardPolicy:什么也不做,直接忽略;
- DiscardOldestPolicy:丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置;
- CallerRunsPolicy:直接由提交任务者执行这个任务
任务队列
比较常见的队列类型有SynchronousQueue和LinkedBlockingQueue。
- SynchronousQueue
这种队列没有缓存,生产者和消费者两个线程必须交叉运行。队列中已经有一个任务时,生产者线程阻塞,直到消费者取出线程,才能继续运行。反过来,队列为空时,消费者线程阻塞。
比如下面两个线程会形成死锁,都进入WAITING状态。
private Thread putThread = new Thread(new Runnable() {
@Override
public void run() {
try {
queue.put("arg0");
System.out.println("put arg0 to queue");
takThread.join();
queue.put("arg1");
System.out.println("put arg1 to queue");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "putThread");
private Thread takThread = new Thread(new Runnable() {
@Override
public void run() {
try {
String arg0 = queue.take();
System.out.println("take arg from queue:" + arg0);
putThread.join();
String arg1 = queue.take();
System.out.println("take arg from queue:" + arg1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "takThread");
SynchronousQueue有两种公平和非公平模式,实际上是两种FIFO和LIFO数据结构。
公平模式使用队列,先入队的任务先执行;
非公平模式使用栈,后入队的任务先执行。
比如下面的代码,默认情况下是先执行putThread2,再执行putThread1。因为默认fair参数是false。
private Thread putThread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
queue.put("putThread1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "putThread1");
private Thread putThread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
queue.put("putThread2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "putThread2");
private Thread takThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("take arg from queue:" + queue.take());
System.out.println("take arg from queue:" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "takThread");
- LinkedBlockingQueue
LinkedBlockingQueue也是生产者-消费者模式,但是生产者线程和消费者线程使用不同的锁来进行同步,理论上生产者可以一直添加任务进队列。
LinkedBlockingQueue是FIFO的队列,相当于公平模式,保证先入队的任务先执行。
LinkedBlockingQueue可以指定容量,当队列已满时,再执行put()方法,生产者线程会阻塞,进入WAITING状态。或执行offer()方法会返回false。
参考:
https://www.cnblogs.com/ants/p/11343657.html
https://blog.csdn.net/yanyan19880509/article/details/52562039
网友评论