转自 公众号【彤哥说源码】https://mp.weixin.qq.com/s/y6PoK3UbVLwdZoauLX8nsQ
1 前言
LinkedBlockingQueue是java并发包下一个以单链表实现的阻塞队列,它是线程安全的。
2 源码分析
2.1 重要属性
/** The capacity bound, or Integer.MAX_VALUE if none */
// 容量
private final int capacity;
/** Current number of elements */
// 元素数量
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
// 链表头
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
// 链表尾
private transient Node<E> last;
/** Lock held by take, poll, etc */
// take 锁
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */、
// notEmpty条件
// 当队列没有元素时,take锁会阻塞在notEmpty条件上,等待其他线程唤醒
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
// put 锁
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
// notFull条件
// 当队列满了,put锁会阻塞在notFull上,等待其他线程唤醒
private final Condition notFull = putLock.newCondition();
(1)capacity,有容量,可以理解为LinkedBlockingQueue是有界队列
(2)head, last,链表头、链表尾指针
(3)takeLock,notEmpty,take锁及其对应的条件
(4)putLock, notFull,put锁及其对应的条件
(5)入队、出队使用两个不同的锁控制,锁分离,提高效率
内部类
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
只有next指针,是单链表结构。
2.2 构造方法
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化head和last指针为空值节点
last = head = new Node<E>(null);
}
如果没有传容量,就会将容量初始化为最大int值
2.3 入队之put方法
public void put(E e) throws InterruptedException {
// 不允许null元素
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 新建一个节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用put锁加锁
putLock.lockInterruptibly();
try {
// 如果队列满了,就阻塞在notFull条件上
// 等待被其他线程唤醒
while (count.get() == capacity) {
notFull.await();
}
// 队列不满,入队
enqueue(node);
// 队列长度加1
c = count.getAndIncrement();
// 如果现队列长度小于容量,就再唤醒一个阻塞在notFull条件上的线程
// 这里唤醒是因为可能有很多线程阻塞在notFull上,直接在这里signal()不会重复加锁
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
这里只有在put之前,队列里面没有元素,才会调用signalNotEmpty方法
enqueue方法
直接加到 last 后面
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
2.4 出队之take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 使用takeLock加锁
takeLock.lockInterruptibly();
try {
// 如果队列无元素,则阻塞在notEmpty条件上
while (count.get() == 0) {
notEmpty.await();
}
// 否则出队
x = dequeue();
// 获取出队前队列的长度
c = count.getAndDecrement();
// 如果取之前队列长度大于1,则唤醒notEmpty
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果取之前队列长度等于容量
// 则唤醒notFull
if (c == capacity)
signalNotFull();
return x;
}
如下图所示,在take()方法中,唤醒notFull线程的条件很苛刻,就是在取元素之前,队列中元素个数是与容量capacity
一致,其余的情况就不会再去signalNotFull
。
那么为什么不去调用呢?因为使用signalNotFull
方法需要获取putLock
,减少锁的次数。
3 总结
(1)LinkedBlockingQueue采用单链表的形式实现;
(2)LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞;
(3)LinkedBlockingQueue是有界队列,不传入容量时默认为最大int值;
面试问题
1.LinkedBlockingQueue与ArrayBlockingQueue对比?
a)后者入队出队采用一把锁,导致入队出队相互阻塞,效率低下;
b)前者入队出队采用两把锁,入队出队互不干扰,效率较高;
c)二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞;
d)前者如果初始化不传入初始容量,则使用最大int值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存;
网友评论