动机起源:来自于有人问这两个堵塞队列哪个效率高一点。
先说答案:LinkedBlockQueue高一点。而且,这两个东西有很大的不同
一、先看LinkedBlockQueue
LInkedBlockQueue有两把锁,take锁和put锁,所以take和put可以并发。
因为:linkedBlockQueque维护的是单链表,添加的时候,只添加到尾部,获取的时候从头部获取,所以take和put的入队和出队操作不会有并发错误:
入队的时候,分两步:1、last.next = node,这一步对出队操作没有任何影响。
2、last = last.next,这个操作,对出队也没有影响。
所以入队只操作tail和tail的next,出队的时候只操作head,而且不是移除head的下一个元素,而是移除head节点,然后把head的下一个元素变成head的节点,所以入队出队互相没有并发影响。
这里假设,如果是双向链表,则肯定会有并发操作,因为,会操作两边的链表,入队和出队有交叉,所以有并发操作。
如果是单向链表,移除的是head的下一个元素也会存在并发问题。
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
研究其put和take方法的源代码,代码里附有注释。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
// put 锁:private final ReentrantLock putLock = new ReentrantLock();
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 自旋请求lock,lock和lockInterruptibly的取别是,lockInterruptibly可线程中断,如果线程Interruptibly,则不再block等待
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 容量已经满了
// 这里为什么要while?因为被唤醒,不一定就立即获得锁
// 如果被唤醒没有获得锁,被别的put线程获得锁了,然后
// 马上添加了一个元素,则被唤醒的线程,获得锁之后,则
// 容量已经满了,所以需要while继续等待
// 所以对于多线程wait操作,一定不要忘了wait操作用while循环做
// private final Condition notFull = putLock.newCondition(); condition用于并发锁实现await,sign的功能
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 获取当前大小,并且当前大小加1,注意这个c是入队前的大小
c = count.getAndIncrement();
if (c + 1 < capacity)
// 还有空余位置, 则唤醒其他线程,注意,put操作会唤醒put操作,而不是我们以为的take操作唤醒put操作,当然,如果put满了,则是take操作唤醒put操作
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 之前队列大小为0,现在添加了数据,有数据,则唤醒take锁
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// take使用的是take锁,也就是take和put是可以并发的
final ReentrantLock takeLock = this.takeLock;
// 自旋,获取take锁
takeLock.lockInterruptibly();
try {
// 没有数据,则等待,如put操作
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 还有数据,则唤醒其他线程take数据
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 之前数据已经饱和,现在取出了一个数据,所以唤醒put锁
if (c == capacity)
signalNotFull();
return x;
}
移除指定位置的元素,自然元素前后指针都会受到影响,前指针跟get有关,后指针跟put有关,所以,锁两把锁
// 移除操作,为什么是锁两把锁,因为移除元素操作对put和take操作都有影响。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
总结:put操作的时候,如果还有线程堵塞,并且还有空余位置,则put操作会唤醒put操作,如此递归,直到所有put操作都执行了。
take操作也是如此,如果还有现车给堵塞take操作,并且还有值,则take操作会唤醒take操作,如此递归,直到所有take操作都执行完了。
当然,当put操作满了之后,put操作也会唤醒take操作,。
同理,当take操作空了之后,也会唤醒put操作。
二、再来看linkedBlockDequeue
同样分析其放和取的方法
public void push(E e) {
addFirst(e);
}
public void addFirst(E e) {
// 这里可以可能到,如果offerFisrt方法调用返回false,则直接报deque 已经满了的错误,而offerFirst方法是没有await操作的。
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
// final ReentrantLock lock = new ReentrantLock();
// 和linkedBlockQueue不同的,linkedBlockQueue有两把锁,putlock,tacklock,linkedBlockDeque只有一把锁
//
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkFirst(node);
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
// 如果已经满了,直接返回false,不wait,可见这个东西并不堵塞,满了直接报错。
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}
其他的方法比如add() addLast 等方法,都是类似,不堵塞,直接报错。
// 可以看到,getfirst方法,同样,不堵塞,直接报错
public E getFirst() {
E x = peekFirst();
if (x == null) throw new NoSuchElementException();
return x;
}
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}
// 顺便看看clear方法,锁住之后,一个一个元素,前指针和后指针置为空,其实双向链表结构,头和尾指向空,就能达到同样的效果,而他这样做,只是为了便于gc快速回收
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> f = first; f != null; ) {
f.item = null;
Node<E> n = f.next;
f.prev = null;
f.next = null;
f = n;
}
first = last = null;
count = 0;
notFull.signalAll();
} finally {
lock.unlock();
}
}
三、总结、linkedBlcokQueue,是一个堵塞队列,fifo,有两把锁,takelock, putlock。有两把锁,意味着,其take和put操作,当每到临界值的时候,是并行的,思想上因为:不到临界值,put和take其实是互不相干的。
而linkedBlockDeque,一把锁,则是串行的。故,单纯论效率,linkedBlcokQueue比linkedBlockDeque高。
linekdBlockDeQue,不是一个堵塞队列,但是也是一个线程安全的队列,队列满了,put直接报错,队列空了,get也直接报错。
网友评论