线程通讯
wait() 与 notifyAll()
wait()
使你可以等待某个条件发生变化,而改变这个条件超出了当前方法的控制能力。通常用于取代不断的空循环(忙等待),而使用 wait()
等待外部条件产生变化,直到 notify()
或 notityAll()
,才去唤醒并检查变化。
- 在
wait()
期间对象锁是被释放的。( 相对的sleep()
和yield()
是不会释放锁的)。 - 通过
notity()
和notifyAll()
或时间到期后,从wait()
恢复执行。
wait()
、notify()
和 notifyAll()
作为基类 Object 的一部分,但想到这些方法往往会和对象锁一起使用。实际上 wait()
、notify()
和 notifyAll()
只能在同步方法或同步块中调用。
实现一个简单的逻辑,push 和 pop,push 动作表示添加资源,并修改 empty 的值,并通知 wait
线程;而 pop 监听 empty 的值,在 empty 为 false 时做出响应,并在消耗资源后再次将线程进入 wait
状态。
public class Resource {
private static int count = 0;
private static boolean empty = true;
public synchronized void push() throws InterruptedException {
while (!empty){
wait();
}
empty = false;
notifyAll();
}
public synchronized void pop() throws InterruptedException {
while (empty){
wait();
}
System.out.println("pop :" + count++);
empty = true;
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Resource resource = new Resource();
executorService.submit(() -> {
int count = 3;
while (count-- > 0) {
try {
TimeUnit.MILLISECONDS.sleep(100);
resource.push();
System.out.println("Hit Success");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executorService.submit(() -> {
while (!Thread.interrupted()) {
try {
resource.pop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executorService.shutdown();
}
使用
while()
循环包含wait()
方法,使被唤醒的任务离开wait()
方法后,再次判断其等待的条件,这样做的原因有:
- 以保证其他先得到锁的任务在修改所等待的条件后,这个任务会被再次的挂起。
- 如果唤醒的原因不是该任务等待的条件时,再次让任务挂起。
notify()
notify()
是 notifyAll()
的一种优化。使用 notify()
时,会在等待该锁的的任务中随机选择一个唤醒。而这样随机的情况导致:
- 所有任务须等待同一个条件,否则使用
notify()
唤醒的那个任务得不到想要的条件,将再次挂起。 - 必须只有一个任务能够在所等待变化中受益。
使用 Lock Condition 对象
使用 Lock 代替 synchronized 时,同样可以使用 Condition 对象实现线程间的通讯。
public class Resource {
private static int count = 0;
private static boolean empty = true;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void push() throws InterruptedException {
lock.lock();
try {
while (!empty) {
wait();
}
empty = false;
condition.signalAll();
} finally {
lock.unlock();
}
}
public void pop() throws InterruptedException {
lock.lock();
try {
while (empty) {
condition.await();
}
System.out.println("pop :" + count++);
empty = true;
} finally {
lock.unlock();
}
}
}
管道通讯
public class Sender implements Runnable {
private PipedWriter out = new PipedWriter();
public PipedWriter getOut() {
return out;
}
@Override
public void run() {
try {
for (char c = 'A'; c != 'Z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(100);
}
out.close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
public class Receiver implements Runnable {
private PipedReader in;
public void setIn(Sender sender) throws IOException {
this.in = new PipedReader(sender.getOut());
}
@Override
public void run() {
try {
while (true) {
System.out.println("Send: " + (char) in.read());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException, IOException {
ExecutorService executorService = Executors.newCachedThreadPool();
Sender sender = new Sender();
Receiver receiver = new Receiver();
receiver.setIn(sender);
executorService.execute(sender);
executorService.execute(receiver);
TimeUnit.MILLISECONDS.sleep(4000);
executorService.shutdownNow();
}
网友评论