1. LinkedBlockingQueue
- 基于
单向链表
实现的阻塞队列 - 属于有界阻塞队列
1.1 生产-消费模型
生产者生产数据到队列,队列满时需要阻塞线程,停止往队列生产
消费者消费队列,队列为空时阻塞线程停止消费
2. 源码分析
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
}
2.1 成员变量
- capacity 链表长度
- count 队列元素数量
- head / last 链表的头结点和尾节点(头结点出队,尾节点入队)
- 锁
- takeLock 消费锁,出队阻塞
- putLock 生产锁,入队阻塞
- notEmpty 和 notFull 是条件变量
- notEmpty 保证消费等待,数据队列空了线程进入等待
- notFull 保证生产等待,数据队列满了线程进入等待
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
}
2.2 链表节点
- 元素 item
- 后继节点next
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
3. 出队和入队
方法 | 抛异常 | 有返回值 | 阻塞 | 阻塞特定时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出队 | remove() | poll() | take() | poll(time, unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
3.1 阻塞入队 put()
向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程
,直到队列有空闲插入成功后返回
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* 若队列满则等待(阻塞当前线程)
* 当前线程被加入 notFull 条件的等待队列中
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 唤醒一个等待入队的线程
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
3.2 非阻塞入队 offer()
- boolean offer(E e)
- 队列中有空闲则插入成功后返回true,如果队列已满则
丢弃
当前元素然后返回false
- 队列中有空闲则插入成功后返回true,如果队列已满则
- boolean offer(E e, long timeout, TimeUnit unit)
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning true upon success and false if this queue is full.
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return true if successful, or false if
* the specified waiting time elapses before space is available
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException { // ... }
3.3 阻塞出队 take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lockInterruptibly();
try {
// 阻塞挂起当前线程,并把当前线程放入 notEmpty 条件队列
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3.4 非阻塞出队 poll()
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3.5 出队 remove()
public boolean remove(Object o) {
if (o == null) return false;
// (1) 双重加锁
fullyLock();
try {
// (2) 遍历队列
for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
if (o.equals(p.item)) {
// (3)
unlink(p, trail);
return true;
}
}
return false;
} finally {
// (4) 释放锁
fullyUnlock();
}
}
网友评论