戳我的笔记链接地址
本文是对《Java并发编程》专栏的读后小结,跟大家分享。
目录
1、bug的源头-三个属性
2、Java内存模型
3、死锁的解决方案
死锁发生的条件
死锁的预防
4、等待-通知机制
wait的使用范式
wait和sleep的区别
5、线程的生命周期
通用的线程生命周期(五态模型)
Java中线程的生命周期
状态转换
6、创建合理的线程数量
CPU密集型应用
I/O密集型应用
7、Lock与synchronized的不同
用两个条件变量实现阻塞队列
异步转同步
8、用Semaphore实现一个限流器
9、读写锁 ReadWriteLock
读写锁升级问题
9、StampedLock 比读写锁更快的锁
10、CountDownLatch 和 CyclicBarrier 让多线程步调一致
11、Java并发容器
注意事项
12、原子类
ABA问题
原子类组成概览
13、Java中的线程池
// todo 线程的运行过程
ThreadPoolExecutor 线程池参数
拒绝策略
使用线程池的注意事项
14、Future 获取异步执行结果
如何获取异步任务执行结果
FutureTask工具类
14、CompletableFuture 异步编程
15、CompletionService 批量执行异步任务
16、Fork/Join 并行计算框架
模拟MapReduce统计单词数量
1、bug的源头-三个属性
可见性、有序性、原子性。
我们的 CPU、内存、I/O 设备都在不断迭代,不断朝着更快的方向努力。但是,在这个快速发展的过程中,有一个核心矛盾一直存在,就是这三者的速度差异。
cpu寄存器缓存导致的可见性问题。
线程切换带来原子性问题。
编译优化带来有序性问题。
其中,在 Java 领域一个经典的案例就是利用双重检查创建单例对象:
public class Singleton {
static Singleton instance;
static Singleton getInstance(){
if (instance == null) {
synchronized(Singleton.class) {
if (instance == null)
instance = new Singleton(); //在这一步如果编译优化
}
}
return instance;
}
}
第八行,如果发生编译优化:我们以为的 new 操作应该是:
分配一块内存 M;
在内存 M 上初始化 Singleton 对象;
然后 M 的地址赋值给 instance 变量。
但是实际上优化后的执行路径却是这样的:
分配一块内存 M;
将 M 的地址赋值给 instance 变量;
最后在内存 M 上初始化 Singleton 对象。
如果在第2步发生线程a到线程b的切换,线程b直接返回instance,这个时候调用没有初始化过的instance对象,会产生空指针异常。
解决办法:对instance对象加volatile语义申明。
2、Java内存模型
Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法。具体来说,这些方法包括 volatile、synchronized 和 final 三个关键字,以及七项 Happens-Before 规则。
3、死锁的解决方案
使用细粒度锁可以提高并行度,是性能优化的一个重要手段。但是有时细粒度的锁容易导致死锁。死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
死锁发生的条件
互斥,共享资源 X 和 Y 只能被一个线程占用;
占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。
死锁的预防
反过来分析,也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。互斥就是锁的目的,所以无法预防。
破坏占有且等待,可以一次性申请所有资源。要么全部获取成功,要么全部获取失败。
破坏不可抢占,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的。java.util.concurrent 这个包下面提供的 Lock 是可以轻松解决这个问题的。提供tryLock(long, TimeUnit) 方法,在一段时间后放弃获取锁。
破坏循环等待条件,破坏这个条件,需要对资源进行排序,然后按序申请资源。
4、等待-通知机制
用 synchronized 实现等待 - 通知机制在 Java 语言里,等待 - 通知机制可以有多种实现方式,比如 Java 语言内置的 synchronized 配合 wait()、notify()、notifyAll() 这三个方法就能轻松实现。
如何用 synchronized 实现互斥锁,你应该已经很熟悉了。在下面这个图里,左边有一个等待队列,同一时刻,只允许一个线程进入 synchronized 保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。
wait()工作原理图
notify()工作原理图
上面我们一直强调 wait()、notify()、notifyAll() 方法操作的等待队列是互斥锁的等待队列,所以如果 synchronized 锁定的是 this,那么对应的一定是 this.wait()、this.notify()、this.notifyAll();如果 synchronized 锁定的是 target,那么对应的一定是 target.wait()、target.notify()、target.notifyAll() 。
而且 wait()、notify()、notifyAll() 这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现 wait()、notify()、notifyAll() 都是在 synchronized{}内部被调用的。如果在 synchronized{}外部调用,或者锁定的 this,而用 target.wait() 调用的话,JVM 会抛出一个运行时异常:java.lang.IllegalMonitorStateException。
// wait的使用范式
while(条件不满足) {
wait();
}
除非经过深思熟虑,否则尽量使用 notifyAll(),只用notify()可能会导致有线程永远得不到执行。
wait和sleep的区别
wait与sleep区别在于: 1. wait会释放所有锁而sleep不会释放锁资源. 2. wait只能在同步方法和同步块中使用,而sleep任何地方都可以. 3. wait无需捕捉异常,而sleep需要. 两者相同点:都会让渡CPU执行时间,等待再次调度! wait()方法与sleep()方法的不同之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的notifyAll()方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。 sleep()方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。但是sleep()方法不会释放“锁标志”,也就是说如果有synchronized同步块,其他线程仍然不能访问共享数据。
5、线程的生命周期
通用的线程生命周期(五态模型)
初始状态,指的是线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,在操作系统层面,真正的线程还没有创建。
可运行状态,指的是线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
当有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,被分配到 CPU 的线程的状态就转换成了运行状态。
运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
Java中线程的生命周期
这五种状态在不同编程语言里会有简化合并。
Java 语言里则把可运行状态和运行状态合并了(变成了运行状态),这两个状态在操作系统调度层面有用,而 JVM 层面不关心这两个状态,因为 JVM 把线程调度交给操作系统处理了。除了简化合并,这五种状态也有可能被细化,比如,Java 语言里就细化了休眠状态(Blocked、Waiting、Timed_Waiting).
Java 语言中线程共有六种状态,分别是:
NEW(初始化状态)
RUNNABLE(可运行 / 运行状态)
BLOCKED(阻塞状态)
WAITING(无时限等待)
TIMED_WAITING(有时限等待)
TERMINATED(终止状态)
状态转换
-
RUNNABLE 与 BLOCKED 的状态转换
只有一种场景会触发这种转换,就是线程等待 synchronized 的隐式锁。 -
RUNNABLE 与 WAITING 的状态转换
第一种场景,获得 synchronized 隐式锁的线程,调用无参数的 Object.wait() 方法。
第二种场景,调用无参数的 Thread.join() 方法。其中的 join() 是一种线程同步方法,例如有一个线程对象 thread A,当调用 A.join() 的时候,执行这条语句的线程会等待 thread A 执行完,而等待中的这个线程,其状态会从 RUNNABLE 转换到 WAITING。当线程 thread A 执行完,原来等待它的线程又会从 WAITING 状态转换到 RUNNABLE。
第三种场景,调用 LockSupport.park() 方法。其中的 LockSupport 对象,也许你有点陌生,其实 Java 并发包中的锁,都是基于它实现的。调用 LockSupport.park() 方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。调用 LockSupport.unpark(Thread thread) 可唤醒目标线程,目标线程的状态又会从 WAITING 状态转换到 RUNNABLE。 -
RUNNABLE 与 TIMED_WAITING 的状态转换
有五种场景会触发这种转换:
调用带超时参数的 Thread.sleep(long millis) 方法;
获得 synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法;
调用带超时参数的 Thread.join(long millis) 方法;
调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
这里你会发现 TIMED_WAITING 和 WAITING 状态的区别,仅仅是触发条件多了超时参数。 -
从 NEW 到 RUNNABLE 状态
从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法就可以了 -
从 RUNNABLE 到 TERMINATED 状态
线程执行完 run() 方法后,会自动转换到 TERMINATED 状态,当然如果执行 run() 方法的时候异常抛出,也会导致线程终止。有时候我们需要强制中断 run() 方法的执行,例如 run() 方法访问一个很慢的网络,我们等不下去了,想终止怎么办呢?Java 的 Thread 类里面倒是有个 stop() 方法,不过已经标记为 @Deprecated,所以不建议使用了。正确的姿势其实是调用 interrupt() 方法。
stop() 方法会真的杀死线程,不给线程喘息的机会,如果线程持有 ReentrantLock 锁,被 stop() 的线程并不会自动调用 ReentrantLock 的 unlock() 去释放锁,那其他线程就再也没机会获得 ReentrantLock 锁。
interrupt() 方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知。被 interrupt 的线程,是怎么收到通知的呢?一种是异常,另一种是主动检测。
当线程 A 处于 WAITING、TIMED_WAITING 状态时,如果其他线程调用线程 A 的 interrupt() 方法,会使线程 A 返回到 RUNNABLE 状态,同时线程 A 的代码会触发 InterruptedException 异常。上面我们提到转换到 WAITING、TIMED_WAITING 状态的触发条件,都是调用了类似 wait()、join()、sleep() 这样的方法,我们看这些方法的签名,发现都会 throws InterruptedException 这个异常。这个异常的触发条件就是:其他线程调用了该线程的 interrupt() 方法。
6、创建合理的线程数量
创建合理的线程数量,目的是将硬件的性能发挥到极致。根据不同的应用场景,我们将应用分为两种场景论述,I/O密集型应用和CPU密集型应用。
CPU密集型应用
对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
I/O密集型应用
对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:
最佳线程数 =1 +(I/O 耗时 / CPU 耗时)
不过上面这个公式是针对单核 CPU 的,至于多核 CPU,也很简单,只需要等比扩大就可以了,计算公式如下:
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
Q: 有些同学对于最佳线程数的设置积累了一些经验值,认为对于 I/O 密集型应用,最佳线程数应该为:2 * CPU 的核数 + 1,你觉得这个经验值合理吗?
A: 工作中都是按照逻辑核数来的,理论值和经验值只是提供个指导,实际上还是要靠压测!!!
7、Lock与synchronized的不同
synchronized存在,Javasdk还造一个Lock的原因主要是为了弥补synchronized,会阻塞线程且不会释放已经占有的资源的问题,lock的三个方法如下:
// 支持响应中断
void lockInterruptibly()
throws InterruptedException;
// 支持超时
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁
boolean tryLock();
并且,Lock支持多个条件变量。Lock用来实现“互斥”,condition用来实现“同步”。
用两个条件变量实现阻塞队列
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
不过,这里你需要注意,Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。如果一不小心在 Lock&Condition 实现的管程里调用了 wait()、notify()、notifyAll(),那程序可就彻底玩儿完了。
异步转同步
远程调用rpc请求时,面临异步转同步的问题,因为tcp层面上rpc请求就是异步的,它不会等待请求返回结果,所以类似于dubbo这种rpc框架也是做了异步转同步的工作,具体实现类似于上面“用两个条件变量实现阻塞队列”的代码。
8、用Semaphore实现一个限流器
极客专栏链接 https://time.geekbang.org/column/article/88499
Semaphore是信号量的意思,可以允许多个线程访问一个临界区。可以用来实现比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。
9、读写锁 ReadWriteLock
极客专栏链接(质量很高,也很实用) https://time.geekbang.org/column/article/88909
这个链接里文章实现了一个简易的完备缓存的示例。
读写锁升级问题
读锁不能升级为写锁。
写锁可以降级为读锁。
如果进行读写锁升级,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个你一定要注意。
9、StampedLock 比读写锁更快的锁
极客专栏链接 使用stampedLock有几个比较重要需要注意的点,所以谨慎使用。
支持三种模式
写锁
悲观读锁(类似与读写锁的读锁)
乐观读(无锁,检测到有锁的时候需要转成悲观读锁)
10、CountDownLatch 和 CyclicBarrier 让多线程步调一致
极客时间专栏链接 https://time.geekbang.org/column/article/89461
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:
CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;
而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。
除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。
11、Java并发容器
通过对操作方法加synchronized关键字,可以使得一个容器变成线程安全的容器。这类容器称为同步容器,针对同步容器的性能问题,Java在1.5版本之后出来了并发容器,性能更高。
同步容器:
常见的有Vector、Stack 和 Hashtable等,通过对操作方法加synchronized关键字实现。
并发容器:
数量较多,主要有以下四大类:Set、Map、Set、Queue
并发容器关系图:
比较熟悉的有ConcurrentHashMap、BlockingQueue....
注意事项
1、在容器领域一个容易被忽视的“坑”是用迭代器遍历容器。
// 有问题的写法
List list = Collections.
synchronizedList(new ArrayList());
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
// 正确的写法
// 因为是对list的操作,如果list变化了,会使得迭代器报错
// 所以先把list锁住,保证迭代器运行期间list不会变化
// 这也是在迭代器里对当前元素删除会报错的原因
List list = Collections.
synchronizedList(new ArrayList());
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
2、另外,使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。
12、原子类
极客时间专栏 https://time.geekbang.org/column/article/90515
原子类高性能的秘密就是硬件支持,基于CPU提供的cas(compare and swap)指令。
ABA问题
aba问题,指的是一个共享变量经历了A--》B--》A的一个过程,虽然最终值一样,但是已经被更新过了。使用原子化的更新对象很可能就需要关心 ABA 问题,因为两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化了。
相关实现有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。AtomicReference 提供的方法和原子化的基本数据类型差不多,这里不再赘述。不过需要注意的是,对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。
原子类组成概览
13、Java中的线程池
美团技术文章Java线程池
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
使用线程池的目的是为了避免线程的频繁创建和销毁。线程是一个重量级的对象,应该避免频繁创建和销毁。
线程池是一种生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。在下面的示例代码中,我们创建了一个非常简单的线程池 MyThreadPool,你可以通过它来理解线程池的工作原理。
//简化的线程池,仅用来说明工作原理
class MyThreadPool{
//利用阻塞队列实现生产者-消费者模式
BlockingQueue<Runnable> workQueue;
//保存内部工作线程
List<WorkerThread> threads
= new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize,
BlockingQueue<Runnable> workQueue){
this.workQueue = workQueue;
// 创建工作线程
for(int idx=0; idx<poolSize; idx++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command){
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread{
public void run() {
//循环取任务并执行
while(true){ ①
Runnable task = workQueue.take();
task.run();
}
}
}
}
/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(
10, workQueue);
// 提交任务
pool.execute(()->{
System.out.println("hello");
});
在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环。线程池主要的工作原理就这些,是不是还挺简单的?
// todo 线程的运行过程
什么情况下增加新线程、添加到任务队列等等
ThreadPoolExecutor 线程池参数
ThreadPoolExecutor(
int corePoolSize, // 表示线程池保有的最小线程数。
int maximumPoolSize, // 表示线程池创建的最大线程数。当项目很忙时,就需要加人,最多加到 maximumPoolSize 个人。
long keepAliveTime, // 线程可以空闲的存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //工作队列,用来保存生产的资源,也是线程要消费的资源
ThreadFactory threadFactory, // 通过这个参数自定义如何创建线程,如给线程指定一个名字
RejectedExecutionHandler handler) // 拒绝策略
拒绝策略
ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy:提交任务的线程自己去执行该任务。
AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
DiscardPolicy:直接丢弃任务,没有任何异常抛出。
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
使用线程池的注意事项
1、尽量使用有界队列
Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors 了。
不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。
2、默认的拒绝策略要慎用
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。
14、Future 获取异步执行结果
极客时间专栏 https://time.geekbang.org/column/article/91292
如何获取异步任务执行结果
Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。
// 提交Runnable任务
Future<?>
submit(Runnable task);
// 提交Callable任务
<T> Future<T>
submit(Callable<T> task);
// 提交Runnable任务及结果引用
<T> Future<T>
submit(Runnable task, T result);
你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。
// 取消任务
boolean cancel(
boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
FutureTask工具类
前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es =
Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();
// 以上两种方式还可以组合起来使用。
下面的示例代码就是用这一章提到的 Future 特性来实现的。首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。
// 创建任务T2的FutureTask
FutureTask<String> ft2
= new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> ft1
= new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());
// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1:洗水壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1:烧开水...");
TimeUnit.SECONDS.sleep(15);
// 获取T2线程的茶叶
String tf = ft2.get();
System.out.println("T1:拿到茶叶:"+tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
}
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2:洗茶壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2:洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2:拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return "龙井";
}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井
14、CompletableFuture 异步编程
极客时间专栏文章链接 https://time.geekbang.org/column/article/91569
CompletableFuture 实现了 CompletionStage 接口。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等,这些都在CompletionStage接口中得到了体现。很好用就是了。具体参考上述文章链接。
注意异常处理,默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
15、CompletionService 批量执行异步任务
当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
16、Fork/Join 并行计算框架
极客时间专栏文章链接 https://time.geekbang.org/column/article/92524
Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的 MapReduce。
Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。
模拟MapReduce统计单词数量
学习 MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用 Fork/Join 并行计算框架来实现。我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。
上述极客时间链接里存在具体的实现代码,可以仔细看看。
网友评论