阻塞队列数据结构示意图如下所示:
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞;
- 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
阻塞:在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。什么时候阻塞/什么时候唤醒,阻塞队列包办。
图1-2 阻塞队列接口实现类.pngBlockingQueue常见API如下所示:
BlockingQueue常见API.png
SynchronousQueue:不存储元素的阻塞队列,每一个put操作必须要等待take操作,反之同理。(生产一个消费一个)
/**
* @author luffy
**/
public class BlockQueueDemo {
public static void main(String[] args){
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+":PUT AAA");
blockingQueue.put("AAA");
System.out.println(Thread.currentThread().getName()+":PUT BBB");
blockingQueue.put("BBB");
System.out.println(Thread.currentThread().getName()+":PUT CCC");
blockingQueue.put("CCC");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AAA").start();
new Thread(()->{
try {
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+blockingQueue.take());
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+blockingQueue.take());
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BBB").start();
}
}
防止虚拟唤醒机制:while,生产者消费者模型Lock实现:
/**
* @author luffy
**/
public class BlockQueueDemo {
public static void main(String[] args){
ShareData data = new ShareData();
for(int i =0 ;i< 10;i++){
new Thread(()->{
try {
data.put();
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
for(int i =10 ;i< 20;i++){
new Thread(()->{
try {
data.take();
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
class ShareData{
private int num = 0;
private final static int MAX = 10;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void put()throws Exception{
lock.lock();
try {
while (num == MAX){
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName()+":put"+num);
condition.signalAll();
}finally {
lock.unlock();
}
}
public void take() throws Exception{
lock.lock();
try {
while (num == 0){
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName()+":take"+num);
condition.signalAll();
}finally {
lock.unlock();
}
}
}
生产者消费者模型阻塞队列实现:
/**
* @author luffy
**/
public class BlockQueueDemo {
public static void main(String[] args){
ShareData shareData = new ShareData(new ArrayBlockingQueue<String>(3));
new Thread(()->{
try {
shareData.put();
} catch (Exception e) {
e.printStackTrace();
}
},"Put").start();
new Thread(()->{
try {
shareData.take();
} catch (Exception e) {
e.printStackTrace();
}
},"Take").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
shareData.flagSet();
}
}
class ShareData{
private volatile boolean flag = true;
private BlockingQueue<String> blockingQueue = null;
private AtomicInteger atomicInteger = new AtomicInteger();
public ShareData(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void put() throws Exception{
while (flag){
String value = atomicInteger.incrementAndGet()+"";
boolean resFlag = blockingQueue.offer(value,2,TimeUnit.SECONDS);
if(resFlag){
System.out.println(Thread.currentThread().getName()+"已生产!");
}else{
System.out.println(Thread.currentThread().getName()+"生产失败!");
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("被叫停,生产结束!");
}
public void take() throws Exception{
while (flag){
atomicInteger.decrementAndGet();
String res = blockingQueue.poll(2,TimeUnit.SECONDS);
if(res == null || "".equals(res)){
this.flag =false;
System.out.println(Thread.currentThread().getName()+"消费失败!");
return;
}else{
System.out.println(Thread.currentThread().getName()+"消费成功!");
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void flagSet(){
this.flag = false;
}
}
网友评论