一、阻塞队列 BlockingQueue
在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
1.1、BlockingQueue的基本原理
先来解释一下阻塞队列:
如上图:
- 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
- 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。
阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。
阻塞队列的常用方法
查阅BlockingQueue总结了以下阻塞队列的方法:
1、boolean add(E e)
- 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。
2、boolean offer(E e)
- 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。
3、void put(E e)
- 直接在队列中插入元素,当无可用空间时候,阻塞等待。
4、boolean offer(E e, long timeout, TimeUnit unit)
- 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。
5、E take()
- 获取并移除队列头部的元素,无元素时候阻塞等待。
6、E poll( long time, timeunit unit)
- 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。
7、boolean remove()
- 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。
8、E element()
- 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。
9、E peek()
- 不移除的情况下返回列头部的元素,队列为空无元素时返回null。
注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
以上支持阻塞和超时的方法都是能够响应中断的。
1.2、BlockingQueue的实现
BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。
下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。
二、LinkedBlockingQueue
LinkedBlockingQueue也是一个阻塞队列,相比于ArrayBlockingQueue,他的底层是使用链表(单向链表)实现的,而且是一个可有界可无界的队列,在生产和消费的时候使用了两把锁,提高并发,是一个高效的阻塞队列。
LinkedBlockingQueue底层的数据结构是链表,这一点很容易验证,在源码中,我们可以看到它有一个内部类Node,基本源码如下所示:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//链表节点定义
static class Node<E> {
//节点中存放的值
E item;
//下一个节点
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
}
从上面的注释可以知道,当某个node节点的next节点为null的时候,说明当前节点是最后一个节点。
LinkedBlockingQueue的基本成员属性如下代码所示:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 队列容量,最大为Integer.MAX_VALUE */
private final int capacity;
/** 队列长度 */
private final AtomicInteger count = new AtomicInteger();
/**
* 头结点
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* 尾结点
* Invariant: last.next == null
*/
private transient Node<E> last;
/** 移除操作的锁,take/poll方法用到 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 移除操作需要等待的条件notEmpty,与takeLock绑定 */
private final Condition notEmpty = takeLock.newCondition();
/** 入队操作的锁,put/offer方法用到 */
private final ReentrantLock putLock = new ReentrantLock();
/** 入队操作需要等待的条件notFull,与putLock绑定 */
private final Condition notFull = putLock.newCondition();
}
可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。
2.1、构造函数
- 容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public LinkedBlockingQueue() {
// 调用有参构造函数,初始化容量capacity为int最大值
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
// 容量不能小于0,注意也不能等于0,这点与常规的集合不同
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化头结点和尾结点为哑节点
last = head = new Node<E>(null);
}
// 将已有的集合全部入队
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
// 获取到putLock锁
final ReentrantLock putLock = this.putLock;
// 加锁,保证线程安全
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
// 节点内的值不能为null
if (e == null)
throw new NullPointerException();
// 判断队列是否满了
if (n == capacity)
throw new IllegalStateException("Queue full");
// 将Node节点添加到队列的尾部,last = last.next = new Node<E>(e);
enqueue(new Node<E>(e));
++n;
}
// 原子类设置Node节点个数,线程安全
count.set(n);
} finally {
// 解锁
putLock.unlock();
}
}
}
2.2、阻塞入队
LinkedBlockingQueue提供的入队的方法有多个,包括add、offer、put。
2.2.1、add(E e)方法
其中add(E e)调用的就是offer(E e),offer方法入队成功返回true,入队失败(队列已满或者阻塞超时)会返回false,那么add方法调用offer方法返回false的话,那么就抛出异常,代码如下:
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
}
2.2.2、offer(E e)方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public boolean offer(E e) {
// 如果存入的值为null,直接抛出空指针异常
if (e == null) throw new NullPointerException();
// 获取队列元素个数
final AtomicInteger count = this.count;
if (count.get() == capacity)
//如果已经满了,直接返回失败
return false;
// 预先设置c为 -1,约定负数为入队失败
int c = -1;
Node<E> node = new Node<E>(e);
// 获取入队锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//双重判断
if (count.get() < capacity) {
//加入链表
enqueue(node);
c = count.getAndIncrement();
// 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
if (c + 1 < capacity)
//唤醒生产者线程,继续插入
// 如果添加数据后还队列还没有满,
//则继续调用notFull的signal方法唤醒其他等待在入队的线程,继续插入
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
if (c == 0)
//说明里面有一个元素,唤醒消费者
signalNotEmpty();
return c >= 0;
}
}
2.2.3、offer(E e, long timeout, TimeUnit unit)方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 如果存入的值为null,直接抛出空指针异常
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
// 预先设置c为 -1,约定负数为入队失败
int c = -1;
// 获取入队锁
final ReentrantLock putLock = this.putLock;
// 获取队列元素个数
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 如果超时时间过了队列仍然是满的话就直接返回false
if (nanos <= 0)
return false;
// 否则调用awaitNanos等待,超时会返回<= 0L
nanos = notFull.awaitNanos(nanos);
}
// 如果上述没有阻塞,也就是队列没有满,那么这里直接入队
enqueue(new Node<E>(e));
// 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
c = count.getAndIncrement();
if (c + 1 < capacity)
// 如果添加数据后还队列还没有满,
//则继续调用notFull的signal方法唤醒其他等待在入队的线程
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// c==0说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程
// 这一点可能不好理解,c = count.getAndIncrement();理解了就差不多
if (c == 0)
signalNotEmpty();
return true;
}
}
我们一起总结一下上述的入队源码:
-
1、入队第一步,上锁,这样保证了线程安全,保证了同一时刻只能有一个入队线程在操作队列。
-
2、如果队列满了,那么会产生阻塞,如果阻塞时间过了,队列依旧是满的,那么将返回false,放弃入队。
-
3、如果队列没有满,那么直接将入队元素加入到队列的尾部,然后检查当前队列是否满了,如果没有满,则唤醒其他入队线程。
-
4、最后检查入队前的队列是否为空(c==0就表示当前入队操作前,是一个空队列),如果为空,那么就有可能存在等待出队的线程在阻塞着,那么在这里进行唤醒。
2.2.4、put(E e)方法
对于put方法,它也是入队的一个方法,这个方法和offer方法原理几乎一致,最大的区别在于put方法没有阻塞超时时间,如果队列满了,那么执行put方法的线程将一直阻塞下去。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public void put(E e) throws InterruptedException {
// 如果存入的值为null,直接抛出空指针异常
if (e == null) throw new NullPointerException();
// 预先设置c为 -1,约定负数为入队失败
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 使用AtomicInteger保证原子性
final AtomicInteger count = this.count;
// 获取put锁
putLock.lockInterruptibly();
try {
// 如果队列满了,则进入put条件队列等待
while (count.get() == capacity) {
notFull.await();
}
// 队列不满,或者被取数线程唤醒了,那么会继续执行
// 这里会往阻塞队列末尾添加一个数据
enqueue(node);
c = count.getAndIncrement();
// 如果队列不满,则唤醒等待时间最长的put线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放put锁
putLock.unlock();
}
// 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
if (c == 0)
signalNotEmpty();
}
//直接放到链表的尾部
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
}
2.3、阻塞出队
2.3.1、remove()方法
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {
public E remove()
// 调用poll()方法出队
E x = poll();
if (x != null)
// 如果有元素出队就返回这个元素
return x;
else
// 如果没有元素出队就抛出异常
throw new NoSuchElementException();
}
}
2.3.2、poll()方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public E poll() {
final AtomicInteger count = this.count;
//如果队列为空,直接返回空
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
// 上锁
takeLock.lock();
try {
// 如果队列不空
if (count.get() > 0) {
//调用dequeue获取队列中的数据
x = dequeue();
// 阻塞队列数量减1
c = count.getAndDecrement();
// 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
if (c > 1)
// 释放take锁
notEmpty.signal();
}
} finally {
// 解锁
takeLock.unlock();
}
// 如果c == capacity就是说队列中有一个空位,唤醒入队线程
if (c == capacity)
signalNotFull();
return x;
}
}
2.3.3、poll(long timeout, TimeUnit unit)方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
// 上锁
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 如果队列空了,则进入take条件队列等待
// 且如果阻塞时间过期,那么将返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 在超时时间内返回,则调用dequeue获取队列中的数据
x = dequeue();
// 阻塞队列数量减1
c = count.getAndDecrement();
// 如果c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程
if (c > 1)
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
// 如果c == capacity就是说队列中有一个空位,唤醒入队线程
if (c == capacity)
signalNotFull();
return x;
}
}
2.3.4、take()方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列空了,则进入take条件队列等待
while (count.get() == 0) {
notEmpty.await();
}
// 获取到第一个节点,非哑节点
x = dequeue();
// 阻塞队列数量减1
c = count.getAndDecrement();
// 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
if (c > 1)
notEmpty.signal();
} finally {
// 释放take锁
takeLock.unlock();
}
// 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
if (c == capacity)
signalNotFull();
return x;
}
//通过这个方法可以看出,链表的首节点的值是null,每次获取元素的时候
//先把首节点干掉,然后从第二个节点获取值
private E dequeue() {
Node<E> h = head;
// 获取第一个元素结点first
Node<E> first = h.next;
// 将头结点自引用,并被垃圾回收掉
h.next = h; // help GC
// 将头结点指向第一个元素结点first
head = first;
// 获取第一个元素结点的值
E x = first.item;
// 将第一个元素结点的值置为null,成为新的哑节点
first.item = null;
// 返回被移除的节点元素值
return x;
}
}
take和put操作如下图所示:
- 1、队列第一个节点为哑节点,占位用的;
- 2、put操作一直往链表后面追加节点;
- 3、take操作从链表头取节点;
三、ArrayBlockingQueue与LinkedBlockingQueue对比
队列 | 是否阻塞 | 是否有界 | 线程安全 | 适用场景 |
---|---|---|---|---|
ArrayBlockingQueue | √ | √ | 一把ReentrantLock锁 | 生产消费模型,平衡处理速度 |
LinkedBlockingQueue | √ | 可配置 | 两把ReentrantLock锁 | 生产消费模型,平衡处理速度 |
3.1、ArrayBlockingQueue
- 数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;
3.2、LinkedBlockingQueue:
- 数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;
- 两把锁,并发性能较好;
- 可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。
参考:
https://www.itzhai.com/articles/graphical-blocking-queue.html
网友评论