简介:
阻塞队列BlockingQueue
被广泛使用在生产者-消费者问题中,其原因是BlockingQueue
提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
基本操作
BlockingQueue继承于Queue接口
有Queue得基本操作
- 插入元素
add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
offer(E e):当往队列插入数据时,插入成功返回true,否则则返回false。当队列满时不会抛出异常;
- 删除元素
remove(Object o):从队列中删除数据,成功则返回true,否则为false
poll:删除数据,当队列为空时,返回null;
- 查看元素
element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常;
peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常
BlockingQueue具有的特殊操作
- 插入数据
put:当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用;
offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时,同样会阻塞插入数据的线程,
直至阻塞队列已经有空余的地方,与put方法不同的是,
该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;。
- 删除数据
take():当阻塞队列为空时,获取队头数据的线程会被阻塞;
poll(long timeout, TimeUnit unit):当阻塞队列为空时,获取数据的线程会被阻塞
另外,如果被阻塞的线程超过了给定的时长,该线程会退出
常用的BlockingQueue
实现BlockingQueue接口的有
ArrayBlockingQueue
DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
SynchronousQueue
下面对这几种常见的阻塞队列进行学习:
ArrayBlockingQueue
ArrayBlockingQueue
是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改
ArrayBlockingQueue
默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue
。而非公平性则是指访问ArrayBlockingQueue
的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue
可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue
。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的ArrayBlockingQueue
,可采用如下代码
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
ArrayBlockingQueue的添加数据方法
有add,put,offer这3个方法,总结如下:
add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true
offer方法如果队列满了,返回false,否则返回true
add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。
这3个方法内部都会使用可重入锁保证原子性。
ArrayBlockingQueue的删除数据方法
有poll,take,remove这3个方法,总结如下:
poll方法对于队列为空的情况,返回null,否则返回队列头部元素。
remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。
poll方法和remove方法不会阻塞线程。
take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。
这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。
例子
BlockingQueueExample类在不同的线程中启动生产者和消费者。生产者将字符串插入共享BlockingQueue中,消费者将它们取出。
package wg;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
Producer producer = new Producer(queue);//生产者
Consumer consumer = new Consumer(queue);//消费者
new Thread(producer).start();
new Thread(consumer).start();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*********************************************************************/
package wg;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**********************************************************************/
package wg;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*结果
1
2
3
*/
LinkedBlockingQueue
LinkedBlockingQueue
是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。
内部使用放锁和拿锁,这两个锁实现阻塞
为了防止LinkedBlockingQueue
容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue
对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE(2的32次方-1)
LinkedBlockingQueue的添加数据方法
add,put,offer跟ArrayBlockingQueue一样,不同的是它们的底层实现不一样。
ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒。
而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程。
LinkedBlockingQueue的删除数据方法
LinkedBlockingQueue的take方法对于没数据的情况下会阻塞,poll方法删除链表头结点,remove方法删除指定的对象。
需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。
一个LinkedBlockingQueue线程安全的例子
package wg;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
final Queue<Integer> q1 = new LinkedBlockingQueue<>();
final Queue<Integer> q2 = new LinkedBlockingQueue<>();
final int n = 1000000;
final int m = 100;
for (int i = 0; i < n; i++) {
q1.add(i);
}
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < m; i++) {
ts.add(new Thread(new Runnable() {
public void run() {
int i = 0;
while (q2.size() < n && i++ < n / m) { // q2.size() 非线程安全,所以设置每个线程添加平均个数,防止poll出null报错
q2.add(q1.poll());
}
}
}));
}
for (Thread t : ts) {
t.start();
}
System.out.println("启动了 " + m + " 个线程,每个线程处理 " + n / m + " 个操作");
for (Thread t : ts) {
while (t.isAlive()) {
Thread.sleep(1);
}
}
System.out.println("q1.size():" + q1.size());
System.out.println("q2.size():" + q2.size());
Set<Integer> set = new HashSet<>();
Integer i;
while ((i = q2.poll()) != null) {
set.add(i);
}
System.out.println("q2.size():" + q2.size());
System.out.println("set.size():" + set.size());
}
}
/*
启动了 100 个线程,每个线程处理 10000 个操作
q1.size():0
q2.size():1000000
q2.size():0
set.size():1000000
*/
SynchronousQueue
SynchronousQueue
是这样一种阻塞队列,其中每个put/offer(插入)
必须等待一个take(移除)
,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
除非另一个线程试图移除某个元素,否则也不能(使用任何方法)添加元素;也不能迭代队列,因为其中没有元素可用于迭代
它是线程安全的,是阻塞的
此队列不允许 null 元素。
它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
peek()
永远返回null
isEmpty()
永远是true
put()
往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
offer()
往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
take()
取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
poll()
取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null
它模拟的功能类似于生活中一手交钱一手交货这种情形,像那种货到付款或者先付款后发货模型不适合使用SynchronousQueue
- 下面使用
SynchronousQueue
模拟只能生产一个产品的生产者-消费者模型。
package wg;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueTest {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
new Product(queue).start();
new Customer(queue).start();
}
static class Product extends Thread{
SynchronousQueue<Integer> queue;
public Product(SynchronousQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run(){
while(true){
int rand = new Random().nextInt(1000);
System.out.println("生产了一个产品:"+rand);
System.out.println("等待三秒后运送出去...");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
queue.put(rand);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(queue.isEmpty());
}
}
}
static class Customer extends Thread{
SynchronousQueue<Integer> queue;
public Customer(SynchronousQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run(){
while(true){
try {
System.out.println("消费了一个产品:"+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------------------------------");
}
}
}
}
网友评论