1. ThreadPoolExecutor
ThreadPoolExecutor是一种比较常见的处理多线程的执行器。你可以配置线程池的最小线程数,当执行器没有太多的任务要处理的时候。亦可以配置最大线程size,如果有很多任务需要处理。一旦当工作负载降下来,线程池就会慢慢的减少线程数量,知道线程数量达到最小值。
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, // keep at least one thread ready,
// even if no Runnables are executed
5, // at most five Runnables/Threads
// executed in parallel
1, TimeUnit.MINUTES, // idle Threads terminated after one
// minute, when min Pool size exceeded
new ArrayBlockingQueue<Runnable>(10)); // outstanding Runnables are kept here
pool.execute(new Runnable() {
@Override
public void run () {
//code to run
}
});
- 备注:如果你配置了一个无界队列的线程池。那么线程数量将不会超过corePoolSize .
- 线程池参数介绍
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
如果线程池的数量大于corePoolSize并且小于maximumPoolSize,那么只有在线程池队列满了的情况下才会创建新线程。
线程池的优点:
-
BlockingQueue 能够避免内存溢出的场景。应用的表现不会被队列的尺寸限制。
-
可以采用不同的Rejection Handler 策略。
- 默认策略:抛出RejectedExecutionException
- CallerRunsPolicy :如果线程池没有被关闭,那么就执行,否则丢弃
- DiscardPolicy: 无法执行,直接丢弃
- DiscardOldestPolicy :丢弃队首的任务。
-
配置自定义的ThreadFactory的好处.
- 定义更有描述性的名称
- 设置进程的状态
- 设置线程优先权
2.获取计算任务的值-callable
- 如果你的运算会产生一些以后需要用到的数据,一个简单的runnable是肯定无法满足你的。在这种场景下,你可以使用 ExecutorService.submit(Callable<T>) 。这个方法会在任务执行完毕之后返回一个值。
- 这个方法返回一个Future对象,你可以从中获取任务的值。
// Submit a callable for execution
ExecutorService pool = anExecutorService;
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override public Integer call() {
//do some computation
return new Random().nextInt();
}
});
// ... perform other tasks while future is executed in a different thread
-
当你要获得执行结果时候,调用future.get()
-
不确定等到时间的方法 get
try {
// Blocks current thread until future is completed
Integer result = future.get();
catch (InterruptedException || ExecutionException e) {
// handle appropriately
}
- 在指定时间等待结果
try {
// Blocks current thread for a maximum of 500 milliseconds.
// If the future finishes before that, result is returned,
// otherwise TimeoutException is thrown.
Integer result = future.get(500, TimeUnit.MILLISECONDS);
catch (InterruptedException || ExecutionException || TimeoutException e) {}
如果计算结果不在需要了,你可以调用Future.cancel(boolean)
- cancel(false) 将只会将任务从队列里面一处
- cancel(true) 还会打断当前运行的任务。
3. submit()与execute() 异常处理的区别
- 普通的 execute() 命令一般是用来执行不要结果的任务,submit() 一般是用来分析Future 对象。
- 我们应该关心着两种异常处理机制不同之处。
- submit() 方法如果不处理就会被框架处理。
示例代码如下:
案例1:用excute 命令来执行 runnable 任务,然后上报异常
import java.util.concurrent .*;
import java.util .*;
public class ExecuteSubmitDemo {
public ExecuteSubmitDemo() {
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(2);
//ExtendedExecutor service = new ExtendedExecutor();
for (int i = 0; i < 2; i++) {
service.execute(new Runnable() {
public void run() {
int a = 4, b = 0;
System.out.println("a and b=" + a + ":" + b);
System.out.println("a/b:" + (a / b));
System.out.println("Thread Name in Runnable after divide by
zero:"+Thread.currentThread().getName());
}
});
}
service.shutdown();
}
public static void main(String args[]) {
ExecuteSubmitDemo demo = new ExecuteSubmitDemo();
}
}
class ExtendedExecutor extends ThreadPoolExecutor {
public ExtendedExecutor() {
super(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
}
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
输出:
creating service
a and b=4:0
a and b=4:0
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2"
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
案例2:用submit 替换excute ,service.submit(new Runnable() 在这个案例中,异常被框架吃了。
输出
creating service
a and b=4:0
a and b=4:0
案例3 将newFixedThreadPool换成ExtendedExecutor
ExtendedExecutor service = new ExtendedExecutor();
输出:
creating service
a and b=4:0
java.lang.ArithmeticException: / by zero
a and b=4:0
java.lang.ArithmeticException: / by zero
- 我们评估上面两个案例,得出答案,使用自定义的线程池来处理异常。
- 其他解决上述问题的方法,如果你使用普通的 ExecutorService & submit ,使用get来获取结果。那么请捕获上述代码里面捕获的三个异常。自定义ThreadPoolExecutor 有一个好处,即是只需要在一个地方捕捉异常。
4. 处理拒绝执行
如果:
- 你试图向一个一个关闭的Executor 提交任务
- 队列已经满了,线程数已经达到最大值
- 那么RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)将会被调用。
- 上述方法的默认行为是抛出一个RejectedExecutionException异常。但是我们有更多的策略可以选择:
- ThreadPoolExecutor.AbortPolicy (default, will throw REE)
- ThreadPoolExecutor.CallerRunsPolicy (executes task on caller's thread - blocking it)
- ThreadPoolExecutor.DiscardPolicy (silently discard task)
- ThreadPoolExecutor.DiscardOldestPolicy (silently discard oldest task in queue and retry execution of the
new task)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) // <--
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) // <
- 你也可以自己实现RejectedExecutionHandler 接口来自定义执行的行为方式。
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
5 :发射后不管 - Runnable Tasks
Executors接受一个 java.lang.Runnable 参数对象,用来处理耗时或者计算量较大的任务。
- 使用方法如下:
Executor exec = Executors.newCachedThreadPool();
exec.ex(new Runnable() {
@Override public void run () {
//offloaded work, no need to get result backJava® Notes for Professionals 696
}
});
注意使用这个excutor ,你获取不到任何数据返回值。在Java8 中你可以应用lamda 来简化代码
Executor exec = anExecutor;
exec.execute(() -> {
//offloaded work, no need to get result back
});
6:不同类型的并发的构造的使用案例:
- ExecutorService
- ExecutorService executor = Executors.newFixedThreadPool(50);
- 这种用法事发简单。它隐藏了很多线程池的底层实现。
- 在任务数量较小我倾向使用这种方式,它不会让内存增长过快也不会降低系统的性能。如果你有cpu/内存 方面的约束,我建议使用线程池的时候对线程的容量以及 处理拒绝执行的任务。
- CountDownLatch
- CountDownLatch 使用固定的数初始化。这个数会随着countdown 方法被调用而减少。我们可以通过调用await方法让当前 线程等待线程执行直至数量降至0。
使用条件:
1. 实现最大数量的异步任务:在某些情况,我们可以在同时启动一组线程
2. 在开始执行之前等待其他线程执行
3. 死锁检测
- ThreadPoolExecutor
- 提供了更多的控制。如果程序被限制了要执行一组延迟Runnable/Callable任务,你可以通过设置最大容积来使用有界数组。一旦队列达到最大容量,你可以定义一个 RejectionHandler 来处理拒绝的任务。Java提供了四种类型的RejectionHandler 不同的实现策略。上述已经提及了,这里就不进行赘述了。
- 你可能不知道 ForkJoinPool
ForkJoinPool是Java 7 引入的。ForkJoinPool与 ExecutorService类似,但是还是有一点区别。ForkJoinPool与非常容易实现任务分割成一些小任务。当某些队列任务执行完毕之后,他便会从其他队列的尾部偷取一个任务进行执行。
Java 8 引入了一个新的api ,你不需要新建RecursiveTask 任务就只可以直接使用ForkJoinPool;
public static ExecutorService newWorkStealingPool()
创建work-stealing线程池会根据处理器的并行程度最大程度的利用处理器
- 默认来说,他会采用cpu 的核心来作为参数。
上述提到的四种原理互不影响。你可以根据你自己的需求,选用合适的框架进行使用
7. 使用ExecutorService等待所有任务执行完毕
- 我们先来看一眼方法:
-
ExecutorService invokeAll()
当所有任务都执行完毕会返回一个Futures list。
示例代码:
import java.util.concurrent .*;
import java.util .*;
public class InvokeAllDemo {
public InvokeAllDemo() {
System.out.println("creating service");
ExecutorService service =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<MyCallable> futureList = new ArrayList<MyCallable>();
for (int i = 0; i < 10; i++) {
MyCallable myCallable = new MyCallable((long) i);
futureList.add(myCallable);
}
System.out.println("Start");
try {
List<Future<Long>> futures = service.invokeAll(futureList);
} catch (Exception err) {
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]) {
InvokeAllDemo demo = new InvokeAllDemo();
}
class MyCallable implements Callable<Long> {
Long id = 0L;
public MyCallable(Long val) {
this.id = val;
}
public Long call() {
// Add your business logic
return id;
}
}
}
8. 使用不同类型的ExecutorService
Executors 返回不同类型的线程池来满足不同需求
1. 1. public static ExecutorService newSingleThreadExecutor()
创建一个单工作线程来操作一个无界队列
它与 newFixedThreadPool(1) 和 newSingleThreadExecutor()的区别Java doc 是这样说的:
与类似的 newFixedThreadPool(1)相比其不保证重新配置异常线程来使用替代线程。
- 这就意味着newFixedThreadPool可以被程序重新配置类似如下:((ThreadPoolExecutor) fixedThreadPool).setMaximumPoolSize(10),这在newSingleThreadExecutor 是不可能的。
- 使用场景:
1. 你打算按照提交顺序来执行任务
2. 你需要一个线程来执行你的所有请求
缺点:无界队列有风险
2. public static ExecutorService newFixedThreadPool(int nThreads)
创建一个固定数量的线程池,复用线程来操作一个共享的无界队列。在任何时刻,大多数线程都会在执行任务的时候被激活。如果提交了额外的任务,在所有线程都是激活状态的时候,那么他们将会阻塞知道有空闲线程。
使用场景:
可以通过获取cpu的数量来提高线程运行情况。
你可以选择线程池的最大线程数
缺点:无界队列有风险
3. public static ExecutorService newCachedThreadPool()
创建一个按需分配的线程池,会重复利用之前已经创建的线程。
使用条件:
一些执行时间段的异步任务
- 缺点:
1. 无界队列有风险
2. 如果所有的存在的线程都在busy 那么每个任务都会新建一个线程。如果任务长期执行,将会创建大量的线程,这将会降低系统的性能。在这种情况下推荐newFixedThreadPool。
4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个线程池,是的能够在指定时间段之后执行任务,或者每隔一段时间执行
使用场景:
处理周期性的事件
缺点:无界队列有风险。
5. public static ExecutorService newWorkStealingPool()
创建任务偷取型的线程池,取决于处理器的并发水平
使用场景
将任务分割成很多子任务
对于空闲线程处理能力较高
缺点:无界队列有风险
- 你可能发现了一个通用的缺陷:无界队列。它会与ThreadPoolExecutor一起绑定使用。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
使用线程池你可以:
- 动态控制线程池尺寸
- 设置BlockingQueue 容积
- 定了拒绝策略
- 自定义 CustomThreadFactory 可以有一些附带功能
9. 调度线程在固定的时间执行,或者在延迟一段时间之后,或者重复执行
- ScheduledExecutorService 提供了一个方法用来调度执行一次性或者重复的任务。
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
- 在普通的线程池方法之外,ScheduledExecutorService添加了4个方法来调度任务,然后返回ScheduledFuture 对象。
在固定时间之后开始一个任务
- 如下代码展示了在十分钟之后执行一个任务:
ScheduledFuture<Integer> future = pool.schedule(new Callable<>() {
@Override
public Integer call() {
// do something
return 42;
}
},10, TimeUnit.MINUTES);
以固定的频率来执行任务
- 如下代码展示了在十分钟之后开始执行一个任务,然后每隔一分钟开始重复任务:
ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// do something
}
},10, 1, TimeUnit.MINUTES);
任务会一直执行到线程池被关闭,future 被去小,或者某个任务发生了异常。
10. 线程池的使用
- 线程池大多数情况下都是通过调用 ExcutorService 的方法创建
- 如下方法可以用来提交任务
- submit: 执行提交的任务返回一个future 对象
- execute: 执行任务并不期望返回任何值
- invokeAll:执行一组任务,并且得到一个返回值列表
- invokeAny:执行所有任务,获得其中一个正确执行的(没有异常的),其余没有执行的任务或被取消。
- 一旦你使用了 shutdown() 来终止线程池。这个操作会阻塞任务的提交。如果向等到所有任务都被执行,你可以用 awaitTermination 或isShutdown() 来包裹代码。
网友评论