Java线程池
前言
平时使用Java线程时,习惯于直接new Thread来创建一个线程,这种创建线程的方法随手就出来了,满足于我们一般的使用场景,但是每次new Thread都会产生一个对象,同时我们也知道这样的线程缺乏有效的管理,由此java推出了线程池,相对于直接new Thread,线程池有以下优点:
- 降低资源消耗
线程池可利用已存在线程,少了对象的创建,减少了性能开销
- 可有效管理线程
控制线程数、控制执行规则,控制并发数
Java线程池源码的介绍
前面大概介绍了线程池的优点,那么java中关于线程池的类有哪些?
[图片上传失败...(image-977979-1554711086155)]
接口Executor
java中线程池的接口为Executor,该接口提供了一个void execute(Runnable var1);方法,接受一个Runnable的任务。
package java.util.concurrent;
public interface Executor {
void execute(Runnable var1);
}
子类接口ExecutorService
Executor接口的子类ExecutorService封装了更多的方法:
public interface ExecutorService extends Executor {
void shutdown();//关闭命令,不再接受新的任务 如果还有未执行完的任务,则执行完之后之后关闭
List<Runnable> shutdownNow();//停止正在执行的任务,不再处理正在等待的任务
boolean isShutdown();//是否关闭,在调用shutdown之后返回true,其他返回false
boolean isTerminated();//立即判断 调用shutdown之后,在线程都执行完后返回true,其他返回false
//在指定时间后,判断是否已经关闭,如果关闭返回true,否则返回false
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
//以下是提交线程的方法
<T> Future<T> submit(Callable<T> var1);
<T> Future<T> submit(Runnable var1, T var2);
Future<?> submit(Runnable var1);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}
抽象子类AbstractExecutorService
ExecutorService接口的抽象实现类AbstractExecutorService,实现了ExecutorService接口中的提交线程的相关方法:
public abstract class AbstractExecutorService implements ExecutorService {
public Future<?> submit(Runnable var1) {
if(var1 == null) {
throw new NullPointerException();
} else {
RunnableFuture var2 = this.newTaskFor(var1, (Object)null);
this.execute(var2);
return var2;
}
}
public <T> Future<T> submit(Runnable var1, T var2) {
if(var1 == null) {
throw new NullPointerException();
} else {
RunnableFuture var3 = this.newTaskFor(var1, var2);
this.execute(var3);
return var3;
}
}
public <T> Future<T> submit(Callable<T> var1) {
if(var1 == null) {
throw new NullPointerException();
} else {
RunnableFuture var2 = this.newTaskFor(var1);
this.execute(var2);
return var2;
}
}
...
}
常用的实现子类ThreadPoolExecutor
抽象类AbstractExecutorService的子类ThreadPoolExecutor是我们常用到的一个类,具体实现了上面提到的未实现的方法,并提供了几种构造函数以及额外的判断方式,比如线程池大小、队列等,源码如下:
package java.util.concurrent;
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public void execute(Runnable var1) {
...
}
public void shutdown() {
...
}
public List<Runnable> shutdownNow() {
...
}
public boolean isShutdown() {
...
}
public boolean isTerminating() {
...
}
public boolean isTerminated() {
...
}
public boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException {
...
}
...
}
ThreadPoolExecutor的构造函数有四个,最终调用的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler),里面的含义如下:
- corePoolSize 核心线程池大小
- maximumPoolSize 线程池最大容量
如果运行的线程少于corePoolSize,新添加任务时会创建新线程来处理任务,不管是否有空闲线程;如果运行的线程大于corePoolSize但是比maximumPoolSize少时,只有队列(workQueue)满的时候才会创建新线程;
ps:默认提交任务时创建线程,当调用线程池的prestartAllCoreThreads的方法时,会提前创建好corePoolSize个基本线程
- keepAliveTime 线程池没有线程执行时,保持存活时间
如果线程池中线程大于corePoolSize,多出的线程在执行完任务后会等待keepAliveTime时间后就会被销毁。
- unit 存活时间单位
- workQueue 任务队列
运行的线程数大于等于corePoolSize时,新的请求就会放在该队列中。队列的种类有以下几种:
1、ArrayBlockingQueue:这是一个基于数据结构的有界阻塞队列,按照先进先出(FIFO)原则进行排序。
2、LinkedBlockingQueue:基于链表结构的队列,按照先进先出(FIFO)原则进行排序,效率比ArrayBlockingQueue高。(对比理解ArrayList和LinkedList)
3、SynchronousQueue:不存储元素的队列,插入操作需要等待另一个线程调用了移除操作,否则插入操作就处于阻塞状态。
4、PriorityBlockingQueue:有优先级的无限阻塞队列,每次都返回优先级最高的元素。
- threadFactory 创建线程工厂
- handler 线程拒绝策略
拒绝策略会在请求队列满了,且创建的线程超过maximumPoolSize数,新请求加入的线程就会被拒绝。拒绝的策略有以下几种:
1、AbortPolicy 直接抛出异常(默认的拒绝策略)
2、CallerRunsPolicy 只用调用者所在线程来运行任务
3、DiscardOldestPolicy 丢弃队列里面最近的一个任务,并执行当前任务
4、DiscardPolicy 不处理,丢弃掉
同时我们也可以通过实现RejectedExecutionHandler接口来自定义策略。
以上可见,我们可以根据ThreadPoolExecutor这个类的构造方法来new 一个线程池,我们采用第一个构造方法来构造一个线程池,构造之前我们先看一下系统默认的构造线程工厂和拒绝策略:
线程工厂 Executors.defaultThreadFactory()
public static ThreadFactory defaultThreadFactory() {
return new Executors.DefaultThreadFactory();//调用该方法进行实例化
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager var1 = System.getSecurityManager();
this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup();
this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable var1) {//新创建线程的方法
Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
if(var2.isDaemon()) {//设置守护线程
var2.setDaemon(false);
}
if(var2.getPriority() != 5) {//设置优先级
var2.setPriority(5);
}
return var2;
}
}
拒绝策略:defaultHandler
private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() {
}
public void rejectedExecution(Runnable var1, ThreadPoolExecutor var2) {//直接抛出异常
throw new RejectedExecutionException("Task " + var1.toString() + " rejected from " + var2.toString());
}
}
创建线程池
public class ThreadPoolTest {
public static void main(String[] args) {
//2个核心数 最大5个 线程队列长度为2
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 5000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(200); //控制依次加入执行
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPoolExecutor.submit(new MyRunnable(i));
}
}
static class MyRunnable implements Runnable {
int index;
public MyRunnable(int index) {
this.index = index;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " " + index + ": 执行了");
try {
Thread.sleep(2500);//防止执行过快,看不出效果
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行结果:
pool-1-thread-1 0: 执行了
pool-1-thread-2 1: 执行了
pool-1-thread-3 2: 执行了
pool-1-thread-4 5: 执行了
pool-1-thread-5 6: 执行了
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2503dbd3 rejected from java.util.concurrent.ThreadPoolExecutor@4b67cf4d[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.test.java.ThreadPoolTest.main(ThreadPoolTest.java:21)
pool-1-thread-1 3: 执行了
pool-1-thread-2 4: 执行了
以上可以看出,0、1、2达到核心线程池最大数后,3、4被放入队列中,然后新加5、6时线程队列已满,但还没有达到最大线程上限,所以重新创建了两个线程,7、8、9等直接被拒绝。
线程池提交任务执行流程
创建线程池
线程池的创建,每次实例化需要的参数太多,为了更简便的使用线程池,java封装了一个线程池的静态工厂类Executors,该类以静态方法提供了四种常用的创建线程池方法:
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
可以看到,传入的参数即为核心线程数,最大线程数核心线程数一致。LinkedBlockingQueue()的方法如下:
public LinkedBlockingQueue() {
this(2147483647);//int 最大值
}
public LinkedBlockingQueue(int var1) {
this.count = new AtomicInteger();
this.takeLock = new ReentrantLock();
this.notEmpty = this.takeLock.newCondition();
this.putLock = new ReentrantLock();
this.notFull = this.putLock.newCondition();
if(var1 <= 0) {
throw new IllegalArgumentException();
} else {
this.capacity = var1;
this.last = this.head = new LinkedBlockingQueue.Node((Object)null);
}
}
核心线程数和最大线程数一样,阻塞队列长度几乎不限。适用于任务数量有限,但是每一个任务都需要长时间运行的情况。
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.submit(new MyRunnable(i));
}
threadPool.shutdown();
}
static class MyRunnable implements Runnable {
int index;
public MyRunnable(int index) {
this.index = index;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " " + index + ": 执行了");
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//执行结果
pool-1-thread-1 0: 执行了
pool-1-thread-2 1: 执行了
pool-1-thread-3 2: 执行了
pool-1-thread-4 3: 执行了
pool-1-thread-5 4: 执行了
pool-1-thread-1 5: 执行了
pool-1-thread-2 6: 执行了
pool-1-thread-3 7: 执行了
pool-1-thread-4 8: 执行了
pool-1-thread-5 9: 执行了
Process finished with exit code 0 线程创建了5个 在4、5中间停顿了2秒,前面的线程执行完毕,后面的才执行。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
}
核心线程数为0,线程总长度几乎无限,存在时间为60秒,适合于任务密集,但是执行时间较短的场景。
//将上述newFixedThreadPool(5)改成newCachedThreadPool();
ExecutorService threadPool = Executors.newCachedThreadPool();
//执行结果
pool-1-thread-1 0: 执行了
pool-1-thread-2 1: 执行了
pool-1-thread-3 2: 执行了
pool-1-thread-4 3: 执行了
pool-1-thread-5 4: 执行了
pool-1-thread-6 5: 执行了
pool-1-thread-7 6: 执行了
pool-1-thread-8 7: 执行了
pool-1-thread-9 8: 执行了
pool-1-thread-10 9: 执行了
Process finished with exit code 0 //创建了10个线程
newSingleThreadScheduledExecutor
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}
线程个数为1,适用于多个任务排队执行的场景。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
return new ScheduledThreadPoolExecutor(var0, var1);
}
//ScheduledThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService{
public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
}
}
//ScheduledExecutorService
public interface ScheduledExecutorService extends ExecutorService {
ScheduledFuture<?> schedule(Runnable var1, long var2, TimeUnit var4);
<V> ScheduledFuture<V> schedule(Callable<V> var1, long var2, TimeUnit var4);
ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable var1, long var2, long var4, TimeUnit var6);
}
实现了ScheduledExecutorService,可以创建一个有日程安排功能的线程池,比如:
- 推迟5秒执行:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
//第一个参数是任务 第二个参数时推迟时间 第三个是时间单位
scheduledExecutorService.schedule(new MyRunnable(1),5L,TimeUnit.SECONDS);
- 每隔一秒执行一次
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
//第一个参数是任务 第二个参数是初始推迟时间 第三个参数是周期频率 第四个参数是时间单位(2、3个参数的)
scheduledExecutorService.scheduleAtFixedRate(new MyRunnable(1), 2, 5, TimeUnit.SECONDS);
- 上个任务执行结束后5秒再次执行
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
//第一个参数是任务 第二个参数是初始推迟时间 第三个参数是结束后时长 第四个参数是时间单位(2、3个参数的)
scheduledExecutorService.scheduleWithFixedDelay(new MyRunnable(1), 1, 1, TimeUnit.SECONDS);
线程池的监控和管理
线程池关闭
在介绍线程池源代码的时候,也介绍了线程池的关闭方法以及每一个方法对应的特点:
- showdown()
调用该方法后,线程池停止接受任务新的任务,并等待已提交的任务(正在执行、处于等待的)执行完毕后关闭线程池。
- shutdownNow()
调用该方法会强制关闭线程池,会取消正在运行的以及处于等待队列的任务,该方法返回等待队列的任务列表,会触发InterruptedException的异常。
- isShutdown();
判断是否调用showdown关闭,在调用shutdown之后返回true,其他返回false
- isTerminated();
立即判断线程池是否关闭 调用shutdown之后,在线程都执行完后返回true,其他返回false
- awaitTermination(long timeout, TimeUnit unit)
该方法是检测线程池是否关闭,在调用shutDown后,等待一段时间后去判断线程池是否关闭,返回true则意味着已关闭,false则意味着为关闭。如果在等待的过程中,线程池关闭了,则立即返回true。
线程池的监控
除了关闭之外,线程池还有一些监控的方法:
- getActiveCount
获取活动线程的数量
- getPoolSize
获取线程池的数量
- getLargestPoolSize
获取线程池曾经最大线程数量,如果线程池满过,那么该数值等于线程池最大值
- getTaskCount
线程池中需要执行的任务数量
- getCompletedTaskCount
线程池中已经执行完成的任务数量,小于等于taskCount
- beforeExecute(可重写)
线程执行前的方法
- afterExecute(可重写)
线程执行后的方法
- terminated(可重写)
线程关闭后的方法
以上就是线程池的基本用法!
网友评论