ThreadPoolExecutor参数解释
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心线程数
- maximumPoolSize 最大线程数
- keepAliveTime 最大空闲时间
- unit 时间单位
- workQueue 阻塞队列
- threadFactory 创建线程的工厂类
- handler 饱和处理机制
线程池的执行流程
image.pngThreadPoolExecutor demo
package com.test.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExePollTest {
public static void main(String[] args) {
// 自定义线程池
// 最大线程该如何定义:
// 1. cpu密集型:cpu几核就是几,可以保持cpu效率最高
System.out.println(Runtime.getRuntime().availableProcessors());
// 2. io密集型:判断程序中,十分消耗io的线程的两倍
ExecutorService service = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
try {
// 最大承载线程数:队列大小+最大线程数量
// AbortPolicy策略:超过RejectedExecutionException
// CallerRunsPolicy策略: 超过最大承载线程数的部分,由调用该线程的线程执行
// DiscardOldestPolicy策略: 超过最大承载线程数,不会抛出异常,丢掉任务
// DiscardOldestPolicy策略: 超过最大承载线程数,尝试和最早的线程竞争,不会抛出异常
for (int i = 1; i <= 9 ; i++) {
// 使用了线程池之后,使用线程池来创建线程
service.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" ok");
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
service.shutdown();
}
}
}
java内置线程池
执行ExecutorService
execute(Runnable)
ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
fixedExecutor.execute(new MyRunnable(1));
fixedExecutor.shutdown();
submit(Runnable)
ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
Future future = fixedExecutor.submit(new MyRunnable(1));
try {
//如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
fixedExecutor.shutdown();
submit(Callable)
ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
Future<String> future = fixedExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "hello world..";
}
});
try {
//如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
fixedExecutor.shutdown();
image.png
invokeAny
invokeAny会返回所有Callable任务中第一个得到执行完的Callable的结果作为返回值
// 1. 创建3个固定线程的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
// 2. 创建listCallable 并初始化三个Callable
List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
listCallable.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("first runs..");
return "first";
}
});
listCallable.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("second runs..");
return "second";
}
});
listCallable.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("third runs..");
return "third";
}
});
String future = null;
try {
//3. 执行invokeAny,会返回上面listCallable的任意一个Callable的返回值
future = fixedExecutor.invokeAny(listCallable);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println(future);
fixedExecutor.shutdown();
image.png
invokeAll
invokeAll会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象
// 1. 创建3个固定线程的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
// 2. 创建listCallable 并初始化三个Callable
List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
listCallable.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("first runs..");
return "first";
}
});
listCallable.add(new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
System.out.println("second runs..");
return "second";
}
});
listCallable.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("third runs..");
return "third";
}
});
List<Future<String>> futures = null;
try {
futures = fixedExecutor.invokeAll(listCallable);
for (Future<String> future : futures) {
System.out.println("result:"+future.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
fixedExecutor.shutdown();
image.png
newCachedThreadPool
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。看下面栗子:
新建一个MyRunnable
class MyRunnable implements Runnable {
private int id;
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" runs..."+id);
}
@Override
public String toString() {
return ""+id;
}
}
测试
public static void testCachedThreadPool01() {
//1. 使用工程类获取线程池对象
ExecutorService cacheExecutor = Executors.newCachedThreadPool();
//2. 提交任务
for (int i = 1; i <= 10; i++) {
cacheExecutor.execute(new MyRunnable(i));
}
}
image.png
// 使用工厂类获取线程池对象
public static void testCachedThreadPool02() {
ExecutorService cacheExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(n++));
}
});
//2. 提交任务
for (int i = 1; i <= 10; i++) {
cacheExecutor.execute(new MyRunnable(i));
}
}
image.png
newFixedThreadPool
最多n个线程将处于活动状态。如果提交了n个以上的线程,那么它们将保持在队列中,直到线程可用
public static void testnewFixedThreadPool01() {
// 最多创建3个线程
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
//2. 提交任务
for (int i = 1; i <= 10; i++) {
fixedExecutor.submit(new MyRunnable(i));
}
}
image.png
工厂类获取线程池对象
// 使用工厂类获取线程池对象
public static void testnewFixedThreadPool02() {
ExecutorService cacheExecutor = Executors.newFixedThreadPool(3, new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(n++));
}
});
//2. 提交任务
for (int i = 1; i <= 10; i++) {
cacheExecutor.execute(new MyRunnable(i));
}
}
image.png
newSingleThreadExecutor
创建一个核心线程,并且最大线程也是1个,同时它具有一个无边界的阻塞队列LinkedBlockingQueue
public static void testnewSingleThreadExecutor01() {
ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
//2. 提交任务
for (int i = 1; i <= 10; i++) {
fixedExecutor.submit(new MyRunnable(i));
}
}
image.png
// 使用工厂类获取线程池对象
public static void testnewSingleThreadExecutor02() {
ExecutorService cacheExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(n++));
}
});
//2. 提交任务
for (int i = 1; i <= 10; i++) {
cacheExecutor.execute(new MyRunnable(i));
}
}
image.png
shutdown方法
shutdown将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,没有被执行的则中断,并抛出RejectedExecutionException异常
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
singleExecutor.execute(()->{
System.out.println("thread1 runs..");
});
singleExecutor.shutdown();
// 上面已经shutdown了,后续加入进来执行的线程,将抛出RejectedExecutionException
singleExecutor.execute(()->{
System.out.println("thread2 runs..");
});
image.png
shutdownNow
shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 10; i++) {
singleExecutor.submit(new MyRunnable(i));
}
List<Runnable>list = singleExecutor.shutdownNow();
for (Runnable runnable : list) {
System.out.println("线程:"+runnable+" 被取消了");
}
image.png
延迟或定期执行任务
延迟两秒执行任务
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3);
scheduledExecutor.schedule(()->{
System.out.println("scheduledExecutor runs..");
}, 2, TimeUnit.SECONDS);
间隔执行
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3, new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程:"+(n++));
}
});
scheduledExecutor.scheduleAtFixedRate(new MyRunnable(1), 2, 2, TimeUnit.SECONDS);
2.gif
网友评论