Java线程协作
[TOC]
线程协作的基本方法:wait/notify/notifyAll和await/signal/signalAll。
一、wait(long timeout)/notify/notifyAll
除了用于锁的锁池(entry set),每个对象(Object)还有另一个等待池(wait set)用于线程间协作,表示等待一个条件,这个条件自己改变不了,需要等待其他线程改变条件后唤醒当前线程。
调用Object的wait方法会将当前线程放入wait set并阻塞。
使用wait的注意事项:
- wait/notify/notifyAll必须包裹在 synchronized 同步块中。
- 多线程 synchronized 同步的必须是一个相同的对象。
wait/notify的两个问题
- wait 应该包裹在while(true){}循环中,而不是if(true)。
因为:如果if用不能重复判断条件,while则会重新判断条件执行。
- 多线程竞争的情况下应该使用notifyAll而不是notify否则可能造成死循环。
因为:wait/notify的同步代码块 synchronized 是一个非公平锁实现。
如果一次只唤醒一个线程,可能造成达不到条件的两个线程相互调用的死循环。
比如在消费者/生产者场景下,如果存在多个消费者和生产者,需要唤醒生产者的时候,可能会造成两个消费者互相唤醒的死循环。
生产者消费者场景的实现,可以通过在生产者消费者synchronized同一个容器来实现,在生产者消费者内部来使用wait/notify实现协作。
也可以通过一个阻塞队列来实现,阻塞队列的底层实现一般是采用另外一种机制:await/signal/signalAll
二、await/signal/signalAll
wait/notify属于Object的方法,配合 synchronized(JVM方法)使用。
而await/signal/signalAll是Condition的方法,配合Lock的子(jdk方法)类(如ReentrantLock)使用。
另外一个可以让线程阻塞的方法是sleep(Thread的方法)方法。调用wait方法需要先获得锁,而调用sleep方法是不需要的。
如果已经持有锁定,sleep不会释放锁,而wait会。
这三者的异同简单对比如下:
方法 | 谁的方法 | 配合使用 | 持有锁时是否释放锁 | wait set |
---|---|---|---|---|
wait/notify | Object | synchronized | Y | unfair |
await/signal | Condition | Lock接口的子类 | Y | 当Lock是fair则wait set是fair |
sleep | Thread | 任何地方 | N | - |
await和wait的作用的区别有一小点:wait只能在synchronized同步的对象中使用一个队列。
而await对应的Lock子类以newCondition多个,也就是一个锁有多个等待队列,相当于一个房子安装了多个门。
Condition接口支持把在condition queue上等待的线程 设置为fair或unfair, 当且仅当Lock是fair的,它生成的Condition对象才是fair的。
synchronized 块或方法对于一个对象只有一个condition queue。这样如果在这个queue上如果有多个condition predicate, 比如isFull(),isEmpty() ,
就必须用notifyAll()方法, 会有context switch及获取锁的性能损失。
三、协作场景
主要有五个场景:
1 生产者/消费者模式
生产者与消费者通过共享队列协作,生产者生产产品放入队列中,消费者从队列中消费产品。
如果队列长度有限,队列满的时候生产者需要等待,队列空的时候消费者需要等待。
2 同时开始
类似于运动员比赛,同时开始,常见于模拟仿真程序中。
3 等待结束(主从协作)
主从协作模式的一种情况:主线程将任务分解为若干子任务,为每个子任务创建线程,主线程继续需要等待所有子线程执行完毕。
4 异步结果(主从协作)
主从协作模式的一种情况:主线程将子线程的调用封装成异步调用,调用后马上返回Future对象,后续通过Future获取最终结果。
5 集合点
类似于团队旅游,过程中设置若干集合点,集合点集合完毕后进行后续活动。
比如在并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程到齐后交换数据计算结果后进行下一次迭代。
3.1 生产者/消费者模式
有两种方法。
3.1.2 第一种方法:在每个线程中使用协作。
在生产者/消费者的线程中通过判断队列的状态,自主的使用wait/notify.
这里的生产/消费过程while包裹在synchronized内,所以生产的时候由一个生产者生产到满,由一个消费者消费到0这种比较规律的现象。
第二种实现方法(共享阻塞队列)实现的方式,生产/消费过程synchronized被while包裹,因而每次生产消费后都会释放锁,所以出现了比较频繁随机的竞争现象。
消费者和生产者的核心代码:
synchronized (linkedList){//消费者线程核心代码
try {
while (true) {
if (linkedList.isEmpty()) {
OS.print(String.format("%s:缓存队列为空,通知生产者生产(notifyAll),暂停消费(wait)",getName()));
linkedList.notifyAll();
linkedList.wait();
} else {
String rs = linkedList.poll();
OS.print(String.format("%s:消费产品:%s,消费后缓存队列size:%s",getName(),rs,linkedList.size()));
Thread.sleep(200);
}
}
} catch(InterruptedException e){
e.printStackTrace();
}
}
try {//生产者线程核心代码
synchronized (linkedList){
int i = 0;
while(true){
if(linkedList.size()==max){
OS.print(String.format("%s:缓存队列满,通知消费者启动(notifyAll),暂停生产(wait)。", getName()));
linkedList.notifyAll();
linkedList.wait();
}else{
linkedList.offer(getName()+(++i));
OS.print(String.format("%s:继续生产:%s,队列当前长度(%s)", getName(),getName()+i, linkedList.size()));
Thread.sleep(500);
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
这里的例子使用的是await/notify,也可以用await/signal来实现,具体实现大同小异。
4.1.2 在共享阻塞队列中实现协作
即消费者线程和生产者线程通过阻塞队列来实现协作。java提供的阻塞队列如 ArrayBlockingQueue 等底层都是通过await机制实现的。
第一种实现便于理解wait的原理,但是第二种实现代码更整洁一些,后续的场景只举例使用第二种实现来实现。
4.2 同时开始(实现最大的并行性):闭锁CountDownLatch也可以实现
主线程充当裁判员,每个子线程充当一个运动员。就位后,运动员子线程等待协作的共享变量是做为起跑信号。同样有两种实现。这里举例使用第二种简洁的实现:
一个起跑信号类FireFlag。
public class FireFlag {
private volatile boolean fireFlag = false;
public synchronized void wait4Fire() throws InterruptedException {
while (!fireFlag){ wait(); }
}
public synchronized void fire(){
this.fireFlag = true;
notifyAll();
}
}
运动员类,start后调用wait4Fire准备就绪:
public class Racer extends Thread{
private FireFlag fireFlag;
Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
@Override
public void run() {
try {
fireFlag.wait4Fire();
System.out.println(String.format("%s:开跑(%s)",getName(),System.nanoTime()+""));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
然后裁判员主线程,通过调用fire方法开枪,运动员线程开始同时起跑。
这里的FireFlag使用闭锁CountDownLatch也可以实现,初始值设为1,所有运动员线程调用CountDownLatch的await()
方法,主线程调用CountDownLatch的countDown()
,计数器变成0,相当于裁判员开枪。
4.3 等待结束(主从协作) 及同步协作类:CountDownLatch
主从协作的一种情况,Thread的join方法就是一种等待结束,底层是实现利用了wait():
while(isAlive){
wait(0);
}
join的等待是线性的,需要逐一等待每个子线程。
public class Run {
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[100];
for (int i = 0;i<100;i++){
threads[i] = new JoinThread("Thread"+i,(int)(Math.random()*1000));
threads[i].start();
}
//join必须单独循环一次,否则100个子线程就变成线性执行了
//但是主线程的等待还是逐个等待的。一些join可能是在线程执行完毕后调用的。
for (int i=0;i<100;i++){
threads[i].join();
}
System.out.println("其他线程都执行完毕了");
}
}
这里举个并发执行的例子,主线程与子线程写作共享一个数count(WaitCount对象)来表示未完成的线程个数。
每执行完毕一个减一,当count==0时调用notify(此时只剩下当前线程和主线程,可以使用notify)通知主线程停止wait。
WaitCountDownLatch对象代码如下:
public class WaitCountDownLatch {
private int count;
WaitCountDownLatch(int count) {
this.count = count;
}
/**
* 阻塞等待计数器==0
* @throws InterruptedException
*/
public synchronized void await() throws InterruptedException {
while (count>0){
wait();
}
System.out.println(String.format("%s:count==0了,所有线程执行完毕了",Thread.currentThread().getName()));
}
/**
* 任务执行完毕后计数器减一
*/
public synchronized void countDown(){
count--;
System.out.println(String.format("%s:结束,count(%s)数减一",Thread.currentThread().getName(),count));
if(count == 0){
notify();
}
}
}
WaitCountDownLatch 是一个用于同步协作的工具类,用于演示原理。
jdk中提供了一个专门的同步类
CountDownLatch
,实际开发中应该使用这个类。
使用用法和这里的WaitCountDownLatch类似,也提供了await()
和countDown()
这两个方法。
100个子线程共用一个WaitCountDownLatch对象,在run方法中调用中调用countDown()
让计数器减一,主线程中启动所有线程后调用await()
。
4.4 异步结果(主从协作):Executor Future
主从协作的情况下,手工创建线程往往比较麻烦,一种常见的模式是异步调用。
异步调用一般返回一个Future对象,通过它可以获得最终的结果。
在java中表示子任务的接口是Callable。
异步结果的主要逻辑在Executor的execute()
方法中:
创建callable线程,返回Future对象。Future的get()
方法的逻辑是,阻塞等待任务线程结束,结束后返回结果。
这里有一个自己实现的Executor的execute()
方法,包含Future的get()
的实现的源码,供参考原理:
public class MyExecutor {
public <V> MyFuture<V> execute(final Callable<V> task){
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task,lock);
thread.start();
return () -> {
synchronized (lock){
while (!thread.isDone()){
lock.wait();
}
}
if(thread.getException() != null){
throw thread.getException();
}
return thread.getResult();
};
}
}
4.5 集合点:栅栏 CyclicBarrier
各个线程分头行动各自到达一个集合点,集合点等待所有线程到齐,交换数据后进行下一步动作。
这里举一个具体的例子:公司部门10人约定去爬山,各自从家中出发,到山底集合,所有人到齐开始爬山。
每个员工对应一个线程:
public class ClimberThread extends Thread {
private CyclicBarrier cyclicBarrier;
private int sleep;
public ClimberThread(String threadName, int sleep, CyclicBarrier cyclicBarrier) {
setName(threadName);
this.cyclicBarrier = cyclicBarrier;
this.sleep = sleep;
}
@Override
public void run() {
System.out.println(String.format("%s:我出发了",getName()));
try {
sleep(sleep);
System.out.println(String.format("%s:我已经到达山底了,只到了(%s)人,我先等一下。",getName(),cyclicBarrier.getNumberWaiting()));
cyclicBarrier.await();
System.out.println(String.format("%s:所有人到齐了,开始爬山",getName()));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
主线程模拟10个员工从家中出发:
public class Run {
public static void main(String[] args) {
int climberCount = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(climberCount);
for (int i = 0; i < climberCount; i++){
new ClimberThread("climber"+i,(int)(Math.random()*1000),cyclicBarrier).start();
}
System.out.println("攀登者们陆续从家里出发了……");
}
}
网友评论