wait()和notifyAll()方法以非常低级方式的解决任务互相操作的问题.每次交互时都握手,在更多的情况下,可以使用同步队列来解决协作问题.
同步队列(BlockingQueue)
同步队列在任何时刻都只允许一个任务插入或者移除元素.在java.util.concurrent.BlockingQueue提供了这个队列,这个接口有各种标准实现,主要使用有以下三种
- LinkedBlockingQueue,它是一个阻塞的链队列.通常都可以选用(Java内置的多种线程池都使用了LinkedBlockingQueue)
- ArrayBlockQueue,具有固定的尺寸,可以在被阻塞之前,向其中添加有限数量的元素.
- SynchronousQueue内部不维护容器,每一次put都会阻塞直到take取出.
LinkedBlockingQueue和ArrayBlockQueue插入和移除使用ReentrantLock,保证不会被多个线程同时操作.而SynchronousQueue使用的是CAS
SynchronousQueue原理
示例
有一个生产的线程和一个消费线程,消费线程会一直等待直到有生产线程生产了一个数据.
示例中生产线程生产一个英文字母放到队列中,消费线程在控制台输出字母
生产者
public class SenderQueue implements Runnable{
private BlockingQueue<Character> queue;
private Random random = new Random(47);
public SenderQueue(BlockingQueue<Character> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println("开始发送");
while (true){
for(char x='A';x<'z';x++){
queue.put(x);
//模拟耗时操作,睡眠一段时间让消费者等待
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
}
} } catch (InterruptedException e) {
System.out.println("发送中断..");
}
System.out.println("结束发送");
}
}
消费者
public class ReceiverQueue implements Runnable {
private BlockingQueue queue;
public ReceiverQueue(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println("开始接收");
while (true) {
Character c = (Character) queue.take();
System.out.println(c + ", ");
}
} catch (InterruptedException e) {
System.out.println("接收中断..");
}
System.out.println("结束接收");
}
}
测试类
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Character> receiveQueue = new LinkedBlockingQueue<>();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new SenderQueue(receiveQueue));
service.execute(new ReceiverQueue(receiveQueue));
TimeUnit.SECONDS.sleep(5);
service.shutdownNow();
}
输出
开始发送
开始接收
A,
B,
C,
D,
E,
接收中断..
结束接收
发送中断..
结束发送
注意每次输出都不一定相同
网友评论