一、线程池的优势
(1)、降低资源消耗,重复利用已创建的线程,降低线程创建和销毁造成的消耗。
(2)、提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行。
(2)、提高线程的可管理性,有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞。
二、线程如何创建(Executors)
Java中的线程池是通过Executor框架实现的。JDK中提供了创建线程池的类,大家首先想到的一定是Executors类,没错,可以通过Executors类来创建线程池,但是不推荐(不管推不推荐,先学着,后面会讲)。Executors类只是个静态工厂,提供创建线程池的几个静态方法,下面我们来慢慢梳理:

1、newFixedThreadPool
创建固定线程数的线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public class newFixedThreadPoolTest {
public static void main(String[] args) {
//一池3线程
ExecutorService pool = Executors.newFixedThreadPool(3);
//模拟十个线程访问
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行线程");
});
}
}
}

源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数等于最大线程数,不存在空闲线程,keepAliveTime为0毫秒,队列是阻塞队列LinkedBlockingQueue。
2、newSingleThreadExecutor
创建单线程的线程池
public class SingleThreadPoolTest {
public static void main(String[] args) {
//一池1线程
ExecutorService pool = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行线程");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程
pool.shutdown();
}
}
}

源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程数和最大线程数都为1,keepAliveTime为0毫秒,队列是阻塞队列LinkedBlockingQueue。
3、newCachedThreadPool
创建一个可缓存线程池
public class newCachedThreadPoolTest {
public static void main(String[] args) {
//一池N线程
ExecutorService pool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行线程");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程
pool.shutdown();
}
}
}

源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数为0,最大线程数为Integer.MAX_VALUE,是一个高度可伸缩的线程池。keepAliveTime为60秒,工作线程处于空闲状态超过keepAliveTime会回收线程,队列是阻塞队列SynchronousQueue。
当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
4、newScheduledThreadPool
创建支持定时以及周期性任务执行的线程池。
public class newScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
try {
for (int i = 0; i < 10; i++) {
service.schedule(() -> {
System.out.println(Thread.currentThread().getName() + "执行线程");
},5,TimeUnit.SECONDS);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程
service.shutdown();
}
}
}
程序等待5秒再执行,这个5秒是由于我们设置的
service.schedule(() -> { xxx },5,TimeUnit.SECONDS);

ScheduledExecutorService提供四个方法:

源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor(上面有继承构造图),构造还是调用了其父类的构造方法。
核心线程数为自定义,最大线程数是Integer.MAX_VALUE。keepAliveTime为0毫秒,队列为阻塞延迟队列DelayQueue。
5、newWorkStealingPool
JDK8引入,创建一个维护足够的线程以支持给定的并行级别的线程池,并且可以使用多个队列来减少争用。 并行级别对应于主动参与或可以从事任务处理的最大线程数。 线程的实际数量可以动态增长和收缩。 工作窃取池不保证执行提交的任务的顺序。
public class newWorkStealingPoolTest {
static SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// CPU 核数
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService pool = Executors.newWorkStealingPool();
try {
for (int i = 0; i < 20; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "在" + sim.format(new Date())+ "执行线程");
});
}
System.out.println("主线程运行时间【" + sim.format(new Date()) + "】");
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程
pool.shutdown();
}
}
}

如果不主动设置它的并发数,那么这个方法就会以当前机器的CPU处理器个数为线程个数。
修改传入并发数为2
ExecutorService pool = Executors.newWorkStealingPool(2);

源码就不截了,反正也讲不清,哈哈。
ExecutoreService还提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。
Executors各个方法的弊端:
1)FixedThreadPool和SingleThreadExecutor允许请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,从而引起OOM异常。
FixedThreadPool和SingleThreadExecutor都使用的是阻塞队列LinkedBlockingQueue的无参构造方法,阻塞队列的长度为Integer.MAX_VALUE,不解释看源码:
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
以上5个核心方法除Executors.newWorkStealingPool方法之外,其他方法都有OOM风险。
既然Executors不推荐使用,那我们使用什么勒。相信大家看到了这里应该发现了newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool、newScheduledThreadPool底层都是使用的ThreadPoolExecutor的构造方法。那我们接下来就来说说ThreadPoolExecutor。
ThreadPoolExecutor
ThreadPoolExecutor提供了四个构造方法:

我们讲讲参数最多的那个构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数 | 说明 |
---|---|
corePoolSize | 线程池中核心线程的数量 |
maximumPoolSize | 线程池中能够容纳同时执行的最大线程数 |
keepAliveTime | 非核心线程的超时时长,当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长,超时也会被回收。 |
unit | 第三个参数的单位,有纳秒、微秒、毫秒、秒、分、时、天等 |
workQueue | 线程池中的任务队列,该队列主要用来存储已经被提交但是尚未执行的任务。 |
threadFactory | 为线程池提供创建新线程的功能,这个我们一般使用默认即可 |
handler | 拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数如何来拒绝,默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。 |
测试
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 2, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));
executor.execute(new TestThread());
executor.execute(new TestThread());
executor.execute(new TestThread());
executor.execute(new TestThread());
//关闭线程池
executor.shutdown();
}
}
class TestThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}

根据上面的代码可以看出核心线程为1,最大线程数为2,保持时间为5秒,长度为2的阻塞队列。可以一个一个的新增线程,看看控制台会发生什么变化。
那什么时候才会实现拒绝策略呢?当新线程数量 + 核心线程数 > 最大线程数时,开始使用拒绝策略(当然也可以理解为:线程数 > 最大线程数 + 队列长度)。
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 2, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));
executor.execute(new TestThread()); //使用核心线程数
executor.execute(new TestThread()); //加入到队列中
executor.execute(new TestThread()); //加入到队列中
executor.execute(new TestThread()); //队列已满,创建新线程 ,此时新线程数为1
//执行第五个任务就会发生异常
executor.execute(new TestThread()); //再次创建新线程 出现异常 新线程数量为2 + 核心线程数为1 > 最大线程数为2
//关闭线程池
executor.shutdown();
}
}
class TestThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}


推荐:使用ThreadPoolExecutor的方式创建线程池。更能灵活的定制自己所需要的线程池。
workQueue 工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。创建时需要指定大小。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。当然也可以指定大小。newFixedThreadPool和newSingleThreadExecutor使用的就是这种队列)。
③SynchronousQueue
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。newCachedThreadPool使用的就是这种队列。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
handler 拒绝策略
参数 | 说明 |
---|---|
AbortPolicy(默认) | 丢弃任务并抛出RejectedExecutionException异常阻止系统正常运行。 |
DiscardPolicy | 丢弃任务,但是不抛出异常。如果允许任务丢失,这是最好的一种方案。 |
DiscardOldestPolicy | 丢弃队列中等待最久的任务,然后重新提交被拒绝的任务。 |
CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务。 |
修改默认的拒绝策略为CallerRunsPolicy
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(new TestThread());
executor.execute(new TestThread());
executor.execute(new TestThread());
executor.execute(new TestThread());
//执行第五个任务就会发生异常
executor.execute(new TestThread());
//关闭线程池
executor.shutdown();
}
}
class TestThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行任务");
}
}

可以看出,第五个线程出现了异常,被主线程main执行了。这正说明了此拒绝策略由调用线程(提交任务的线程)直接执行被丢弃的任务的。
可以看到上面的线程名称pool-1-thread-xx,那能不能改呢。可以的
修改默认的ThreadFactory
下次再写。

非阻塞队列:如果现在队列中没有元素,进行取数据,等到的是null。
阻塞队列:如果现在队列中没有元素,会进行等待,等到什么时候队列中有数据了才会取出来。
网友评论