目录
最近在阅读Spark源码的过程中,又重新接触到了一些Java并发方面的知识,于是就见缝插针地将它们记录下来,当做复习与备忘。
阻塞队列简介
阻塞队列的定义
根据Doug Lea在JavaDoc中的解释,所谓阻塞队列,就是在普通队列的基础之上,支持以下两种操作的队列:
- 当某线程从队列获取元素时,如果队列为空,就等待(阻塞)直至队列中有元素;
- 当某线程向队列插入元素时,如果队列已满,就等待(阻塞)直至队列中有空间。
也就是说,阻塞队列是自带同步机制的队列。它最常用来解决线程同步中经典的生产者-消费者问题,前面讲过的Spark Core异步事件总线中,就采用阻塞队列作为事件存储。
Java中的阻塞队列
Java中阻塞队列的基类是j.u.c.BlockingQueue接口,它继承自Queue接口,并且定义了额外的方法实现同步:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
上述put()与offer()方法用于向队列插入元素,take()与poll()方法则是从队列获取元素。不同的是,put()与take()方法在插入/获取时,如果必须等待,就会一直阻塞下去;而offer()与poll()方法可以指定阻塞的时间长度。
以BlockingQueue接口为中心的继承关系如下图所示。
平时开发中比较常用的阻塞队列是基于数组实现的ArrayBlockingQueue,与基于单链表实现的LinkedBlockingQueue。本文选择后者来深入看一下阻塞队列的实现细节,因为它的性能在多数情况下更优,可以自行写benchmark程序来测测。
LinkedBlockingQueue
LinkedBlockingQueue(以下简称LBQ)是基于单链表实现的,先进先出(FIFO)的有界阻塞队列。
单链表定义
LBQ的单链表结点数据结构定义在静态内部类Node中。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
在类的内部还定义了单链表的头结点与尾结点。
transient Node<E> head;
private transient Node<E> last;
head始终指向链表的第一个结点,该结点是哨兵结点,不存储数据,只标记链表的开始,即head.item == null
。这样可以避免只有一个结点时造成混乱。
tail始终指向链表的最后一个结点,该结点是有数据的,并满足last.next == null
。
LBQ在队头获取及弹出元素,在队尾插入元素。
锁和等待队列
LBQ采用双锁机制保证入队和出队可以同时进行,互不干扰。
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
可见定义有两个ReentrantLock,takeLock用于控制出队,putLock用于控制入队。另外,还有这两个锁分别对应的条件变量notEmpty和notFull,分别维护出队和入队线程的等待队列。ReentrantLock和Condition都是Java AQS机制的重要组成部分,之后也会细说。
值得注意的是,在某些方法中需要同时对takeLock与putLock加锁与解锁,所以LBQ内部也提供了这样的方法。
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
这两个方法总会成对调用,保证所有需要同时加锁和解锁的地方,其顺序都一致并且不可中断,也防止了前一个锁操作成功执行,后一个锁操作被打断导致死锁的风险。
另外,LBQ也对条件变量的Condition.signal()方法进行了简单封装,分别用来唤醒阻塞的出队操作线程和入队操作线程。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
容量和计数
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
capacity是LBQ的最大容量,可以在构造方法中随同名参数传入,默认值是Integer.MAX_VALUE。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
count则是LBQ内当前元素的计数,由于入队和出队动作可以并发执行,所以要用原子类型AtomicInteger保证线程安全。
入队操作
由于put()和offer()方法的逻辑基本相同,所以只看offer()方法就好了。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
在入队时,首先将putLock加锁,然后用卫语句count.get() == capacity
判断队列是否已满,若已满,则进入等待循环。当阻塞的时间超时后,判定入队操作失败,并返回false。
如果队列未满,或者在超时时间未到时有了空间,就调用enqueue()方法在队尾插入元素,并将计数器自增。入队后若还有更多的剩余空间,则唤醒其他等待的入队线程。
最后将putLock解锁,并检查由count.getAndIncrement()返回的值是否为0。如果为0,表示队列刚刚由空变为非空状态,因此也要唤醒等待的出队线程。
出队操作
同理,只看poll()方法。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
将讲解入队方法时的描述反着说一遍就行了:
在出队时,首先将takeLock加锁,然后用卫语句count.get() == 0
判断队列是否为空,若为空,则进入等待循环。当阻塞的时间超时后,判定出队操作失败,并返回false。
如果队列不为空,或者在超时时间未到时进了新元素,就调用dequeue()方法弹出队头元素,并将计数器自减。出队后若还有更多的剩余元素,则唤醒其他等待的出队线程。
最后将takeLock解锁,并检查由count.getAndDecrement()返回的值是否为capacity。如果为capacity,表示队列刚刚由满变为不满状态,因此也要唤醒等待的入队线程。
需要操作双锁的情况
以remove()方法为例。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
由于单链表删除结点涉及到对链表的遍历,以及对前驱和后继结点的断链和补链,因此必须将两个锁都加上,禁止一切修改。待删除成功后才能解锁,继续正常的入队和出队操作。
生产者-消费者问题示例
生产者-消费者问题的解决方法用操作系统理论中的信号量PV(wait-signal)原语描述如下:
semaphore filled = 0;
semaphore empty = BUF_CAPACITY;
mutex_semaphore mutex = 1;
procedure producer() {
while (true) {
item = produce();
wait(empty);
wait(mutex);
buffer.put(item);
signal(mutex);
signal(filled);
}
}
procedure consumer() {
while (true) {
wait(filled);
wait(mutex);
item = buffer.get();
signal(mutex);
signal(empty);
consume(item);
}
}
利用阻塞队列可以免去自己实现同步机制的麻烦,从而非常方便地实现。一个极简的示例如下:
public class ProducerConsumerExample {
private static final int BUF_CAPACITY = 16;
public static void main(String[] args) {
BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<>(BUF_CAPACITY);
Thread producerThread = new Thread(() -> {
try {
while (true) {
long value = System.currentTimeMillis() % 1000;
blockingQueue.put(value);
Thread.sleep(value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "producer");
Thread consumerThread = new Thread(() -> {
try {
while (true) {
System.out.println(blockingQueue.take());
Thread.sleep(System.currentTimeMillis() % 1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consumer");
producerThread.start();
consumerThread.start();
}
}
一个小(?)问题
在上面的代码(以及j.u.c包中很多类的代码)的方法体中,经常能看到类似以下的语句:
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
也就是有些类中定义的字段,在方法中使用时会先赋值给一个局部变量。这样做到底是为了什么?以目前我所了解到的而言,还没有特别确切的答案,但可以确定是一个非常微小的优化,与JVM及缓存有关。
以下是reference传送门:
- https://stackoverflow.com/questions/8019831/java-lock-variable-assignment-before-use-why
- https://stackoverflow.com/questions/2785964/in-arrayblockingqueue-why-copy-final-member-field-into-local-final-variable
- http://cs.oswego.edu/pipermail/concurrency-interest/2013-February/010768.html
顺便,StackOverflow最近(不知道是哪一天)改版成了1998年的样式,满满的怀旧感。上面concurrency-interest邮件列表中关于这个问题也是众说纷纭,如果仔细爬楼还会发现Doug Lea本人的回复,不过有些令人费解。
网友评论