为什么说频繁的创建和销毁线程会浪费大量的系统资源?
线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间。在线程销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源
线程池的作用
- 利用线程池管理并复用线程、控制最大并发数等
- 实现任务线程队列缓存策略和拒绝机制
- 实现某些与时间相关的功能,如定时执行、周期执行等
- 隔离线程环境。通过配置独立的线程池,将一些服务隔开,避免个服务相互影响
合理的使用线程池能够带来三个好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁的消耗
- 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行
- 提高线程的可管理性。
1. 自定义线程池-阻塞队列
自定义线程池.jpgpublic class ThreadPool {
public static void main(String[] args) {
Pool pool = new Pool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
//拒绝策略
//queue.put(task); //死等
//queue.offer(task,500,TimeUnit.MILLISECONDS); //带超时等待
//System.out.println("放弃");
});
for (int i = 0; i < 3; i++) {
int j = i;
pool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " execute " + j);
});
}
}
}
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
class Pool {
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//线程数
private int coreSize;
//获取任务的超时时间
private long timeout;
private TimeUnit unit;
private RejectPolicy<Runnable> rejectPolicy;
public Pool(int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.unit = unit;
taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
//执行的方法
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println(Thread.currentThread().getName() + " add workers" + task);
workers.add(worker);
worker.start();
} else {
//taskQueue.put(task);
//策略模式
//1 死等 2. 带超时等待 3. 放弃任务执行 4. 抛出异常 5. 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//while (task!= null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
try {
System.out.println(Thread.currentThread().getName() + " 执行 " + task );
task.run();
} catch (Exception e) {
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println(Thread.currentThread().getName() + " 移除");
workers.remove(this);
}
}
}
}
class BlockingQueue<T> {
Deque<T> queue = new ArrayDeque<>();
int capacity;
Lock lock = new ReentrantLock();
Condition emptyWaitSet = lock.newCondition();
Condition fullWaitSet = lock.newCondition();
public BlockingQueue(int queueCapacity) {
this.capacity = queueCapacity;
}
//添加带超时的等待
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
//返回的是剩余的时间
if (nanos <= 0) {
System.out.println(Thread.currentThread().getName() + " 没等到,返回");
return null; //没等到
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
System.out.println(Thread.currentThread().getName() + " 出队" + task);
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
//取任务
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
System.out.println(Thread.currentThread().getName() + " 队列为空,等待");
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
System.out.println(Thread.currentThread().getName() + " 出队 " + task);
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
//添加任务
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
System.out.println(Thread.currentThread().getName() + " 队列已满,等待进入队列");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
System.out.println(Thread.currentThread().getName() + " 入队 " + task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
//带超时的阻塞添加
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
System.out.println(Thread.currentThread().getName() + " 队列已满,等待进入队列");
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
System.out.println(Thread.currentThread().getName() + " 入队 " + task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否已满
if (queue.size() == capacity) {
rejectPolicy.reject(this, task);
} else {
queue.addLast(task);
emptyWaitSet.signal();
System.out.println(Thread.currentThread().getName() + " 入队 " + task);
}
} finally {
lock.unlock();
}
}
}
执行结果:
(放弃任务)
线程池.png
- 自定义拒绝策略接口
@FunctionalInterface
interface RejectPolicy{
void reject(BlockingQueue<T> queue, T task);
}
2. ThreadPoolExecutor
2.1线程池状态
ThreadPoolExecutor 使用int 高三位来表示线程池状态,低29位表示线程池数量
状态名 | 高3位 | 接受新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接受新任务,会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为0即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS原子操作进行赋值
//c为旧值,ctlOf返回的结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
//rs为高三位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.2 构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心线程数目
- maximumPoolSize 最大线程数目
- keepAliveTime 生存时间--针对救急线程
- unit 时间单位--针对救急线程
- workQueue 阻塞队列
- threadFactory 线程工厂--可以为线程创建时起名字
- handler 拒绝策略
最大线程数= 核心线程数+ 救急线程数
-
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
-
当线程数达到corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程
-
如果队列选择了有界队列,那么任务超过了队列大小时,会创建maximunPoolSize-corePoolSize数目的线程来救急
-
如果线程达到了maximunPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略jdk提供了四种实现,其实著名框架也提供了实现
- AbortPolicy 让调用者抛出RejectExecutionException异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Dubbo 的实现,在抛出 RejectExecutionException 异常之前会记录日志,并dump线程栈信息,方便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中的每种拒绝策略
-
当高峰过去后,超过coreSize的救急线程如果一段时间没有任务做,需要结束资源,这个时间由keepAliveTime 和 unit来控制。
根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池
2.3 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点:
- 核心线程数== 最大线程数(没有救急线程被创建),因此无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
评价:
- 适合于任务量已知,相对耗时的任务
2.4 newCacheThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点:
- 核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着
- 全部是救急线程(60s后可以回收)
- 救急线程可以无限创建
- 队列采用了SynchronousQueue实现的特点是,它没有容量,没有线程来取是放不进去的(一手交钱一手交货)
评价:
- 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程。
- 适合任务数比较密集,但每个任务执行时间短的情况
2.5 newSingleThreadPool
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
使用场景:
希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完成,这唯一的线程不会被释放
区别:
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
- 线程个数始终为1,不能修改
- FinalizableDelegateExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor 中特有的方法
- Executors.newFixedThreadPool(1) 初始为1,后面还可以修改
- 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改
2.6 提交任务
//执行任务
void execute(Runnable command);
//提交任务task, 用返回值Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
//提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
//提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2.7 关闭线程池
shutdown
/**
* 线程池状态变为SHUTDOWN
* 不会接收新任务
* 但已提交任务会执行完
* 此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态
advanceRunState(SHUTDOWN);
//仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
shutdownNow
/**
* 线程池状态变为STOP
* 不会接收新任务
* 会将队列中的任务返回
* 并用 interrupt 的方式中断正在执行的任务
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态
advanceRunState(STOP);
//打断所有线程
interruptWorkers();
//获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
其他方法
//不在RUNNING 状态的线程池,此方法返回true
boolean isShutDown();
//线程池状态是否是TERMINATED
boolean isTerminated();
//调用shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED 后做一些事情,可以用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
public class ShutDownTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future1 = pool.submit(()->{
System.out.println("running 1");
Thread.sleep(1000);
System.out.println("finish 1");
return "1";
});
Future<String> future2 = pool.submit(()->{
System.out.println("running 2");
Thread.sleep(1000);
System.out.println("finish 2");
return "2";
});
Future<String> future3 = pool.submit(()->{
System.out.println("running 3");
Thread.sleep(1000);
System.out.println("finish 3");
return "3";
});
System.out.println("shutdown");
pool.shutdown();
}
}
执行结果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3
public class ShutDownTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future1 = pool.submit(()->{
System.out.println("running 1");
Thread.sleep(1000);
System.out.println("finish 1");
return "1";
});
Future<String> future2 = pool.submit(()->{
System.out.println("running 2");
Thread.sleep(1000);
System.out.println("finish 2");
return "2";
});
Future<String> future3 = pool.submit(()->{
System.out.println("running 3");
Thread.sleep(1000);
System.out.println("finish 3");
return "3";
});
System.out.println("shutdown");
try {
pool.awaitTermination(3, TimeUnit.SECONDS); //不用,因为不知道等多久合适
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("other...");
}
}
执行结果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3
other...
public class ShutDownTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future1 = pool.submit(()->{
System.out.println("running 1");
Thread.sleep(1000);
System.out.println("finish 1");
return "1";
});
Future<String> future2 = pool.submit(()->{
System.out.println("running 2");
Thread.sleep(1000);
System.out.println("finish 2");
return "2";
});
Future<String> future3 = pool.submit(()->{
System.out.println("running 3");
Thread.sleep(1000);
System.out.println("finish 3");
return "3";
});
System.out.println("shutdownNow");
List<Runnable> task = pool.shutdownNow();//返回队列中任务
}
}
执行结果:
shutdownNow
running 1
running 2
2.8. 池大小
- 过小会导致程序不能充分利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多的资源
2.8.1 CPU密集型
通常采用 CPU核数 + 1 能够实现最优的CPU利用率,+1 是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费
2.8.2 I/O密集型
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时,远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高他的利用率
经验公式如下
线程数 = 核数 * CPU利用率 * 总时间(CPU计算时间 + 等待时间)/ CPU计算时间
例如:4核CPU计算时间是50%,其他等待时间是50%,期望CPU被100%利用,套用公式
4*100% *100% / 50% = 8
2.9 任务调度线程池
在任务调度线程池功能加入之前,可以使用java.util.Timer来实现定时功能,Timer 的优点在于简单易用,但由于所有的任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
public class TimerTest {
public static void main(String[] args){
Timer timer = new Timer();
TimerTask timerTask1 = new TimerTask() {
@Override
public void run() {
System.out.println(" task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask timerTask2 = new TimerTask() {
@Override
public void run() {
System.out.println(" task2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//使用Timer添加两个任务,希望他们都在 1s 后执行
//Timer 内只有一个线程来顺序执行队列中的任务
timer.schedule(timerTask1, 1000);
timer.schedule(timerTask2, 1000);
}
}
2.9.1 使用任务调度线程池改进
public class TimerTest {
public static void main(String[] args){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
pool.schedule(()->{
System.out.println(" task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
pool.schedule(()->{
System.out.println(" task2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
}
}
3. Fork/Join
Fork/Join 是JDK 1.7 新加入的线程池实现,
网友评论