什么是线程通信
线程通信指的是多个线程之间,某个线程修改了一个对象的值时,其他的线程也能够感知到该值的变化并进行相关的操作,使线程与线程之间间接的实现通信。实现线程通信的方法可以通过以下几种方式:
- 基于
volatile
修饰的共享对象。 - 通过
wait/notify
机制实现。 - 线程的
join()
方法。 - 使用
synchronized
同步关键字。 - 另外也可借助中间件或者数据库做数据共享实现通信等等。
本文主要就通过wait/notify
机制进一步分析讲解。
wait/notify
相信各位对于wait/notify
应该不会太陌生,他们的主要作用就是用于控制线程之间的等待与唤醒,具体方法作用如下:
-
wait()
:使当前线程进入阻塞状态,并且释放当前线程持有的锁。与sleep()
不同的是sleep()
不会释放持有的锁。 -
notify()
:唤醒处于阻塞状态下的一个线程。 -
notifyAll()
:唤醒处于阻塞状态下的所有线程。
需要唤醒一个被
Object.wait()
方法阻塞的线程有两种方法:
- 其他线程调用了同一个对象的
notify()/notifyAll()
方法。- 调用了被阻塞线程的
interrupt()
方法,被阻塞的线程会被唤醒并且抛出InterruptException
异常。
wait/notify方法如何使用
wait()/notify()
方法实际上是通过同一个共享对象的竞争来实现数据变更通知,可以简单的理解为某个共享对象的数据变化达到某个条件而触发阻塞或唤醒动作,从而实现线程之间的通信。下面我们就使用wait()/notify()
来实现一个简易的消息通知场景:
- 首先定义一个消息生产者:
static class MessageProducer extends Thread {
private int maxSize; // 支持最大消息数
private Queue<String> messageQueue; // 存放消息的队列
MessageProducer(Queue<String> messageQueue, int maxSize) {
this.messageQueue = messageQueue;
this.maxSize = maxSize;
}
@Override
public void run() {
int i = 0;
while(true) {
i++;
synchronized (messageQueue) {
if (messageQueue.size() == maxSize) {
System.out.println("消息已占满");
try {
// 消息队列满了,进入阻塞等待消费者消费部分消息后继续生产
messageQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发送消息:" + i);
messageQueue.add("message-" + i);
messageQueue.notify();
}
}
}
}
- 消息生产者通过一个
while
循环对共享消息队列messageQueue
使用synchronized
进行加锁操作。- 消息达到最大数量
messageQueue.size() == maxSize
时调用wait()
方法让当前线程进入等待。- 每间隔
1秒
生产一个消息,打印日志并调用notify()
通知唤醒阻塞的消息消费者线程。
- 定义一个消息消费者:
static class MessageConsumer extends Thread {
private int maxSize; // 支持最大消息数
private Queue<String> messageQueue; // 存放消息的队列
MessageConsumer(Queue<String> messageQueue, int maxSize) {
this.messageQueue = messageQueue;
this.maxSize = maxSize;
}
@Override
public void run() {
while(true) {
synchronized (messageQueue) {
if (messageQueue.isEmpty()) {
System.out.println("消息全部已读");
try {
// 消息全部已读,需等待消息发送者继续发送方可继续读取消息
messageQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("读取消息:" + messageQueue.remove());
messageQueue.notify();
}
}
}
}
- 消息消费者通过
while
循环首先对共享队列messageQuery
进行加锁操作,在这里消息生产者与消费者的messageQuery
必须是同一个共享对象。- 根据
messageQueue.isEmpty()
判断消息队列是否为空来确认是否有未读的消息,如果为空则表示全部已读,则当前线程进入等待状态,待消息生产者继续生产消息后可再次被唤醒。- 每次循环使用
messageQueue.remove()
读取并消费消息操作,并调用messageQueue.notify()
方法唤醒阻塞状态的消息生产者继续生产消息。
- 定义
main
方法运行消息的生产与消费线程:
public static void main(String[] args) {
// 消息生产者与消费者共用一个共享对象
Queue<String> messageQueue = new LinkedBlockingQueue();
// 支持最大消息数
int maxSize = 2;
Thread messageProducer = new MessageProducer(messageQueue, maxSize);
Thread messageConsumer = new MessageConsumer(messageQueue, maxSize);
// 运行两个线程
messageProducer.start();
messageConsumer.start();
}
部分运行结果如下:
从上面的运行结果可以看出消息生产者与消息消费者之间通过共享对象进行
wait()/notify()
方法操作实现了通信功能,使两个线程在运行过程中不断的进入阻塞与唤醒从而达到消息不断的发送与读取。
wait/notify的为什么要加同步锁
上面的例子我们也看到了wait()/notify()
方法的使用都是在同步锁synchronized
当中使用的,之所以这么做是因为不在synchronized
中就会抛出IllegalMonitorStateException
异常,原因主要有两点:
-
wait()/notify()
方法都是基于一个共享对象来实现线程通信的,这就意味着存在多个线程对同一个共享对象竞争的可能,为了保证共享对象的原子性就需要加锁保护。 -
wait()/notify()
方法主要实现的是线程的阻塞与唤醒,而在调用notify()
的时候要唤醒哪个线程并不确定,这时候通过synchronized
实现了同步机制,正好为wait()/notify()
提供很好的协同机制。
去掉synchronized
同步锁后运行出现的异常如下: [图片上传失败...(image-eb73fb-1665193432014)]
总结
多数的生产者消费者场景通信都是通过wait()/notify()
的方式去实现的,在多线程交互的环境当中合理的设计运用好wait()/notify()
方法就能够良好的实现线程之间的通信功能,但以此同时也会带来更多的并发以及对象的共享问题,现实开发过程当中还需多多考虑更细节的东西,多进行一些测试以防止意外的出造成不必要的损失。
网友评论