ConcurrentLinkedQueue:非阻塞线程安全队列,适用于高并发场景下的队列,性能高于BlockingQueue,基于链表的无界线程安全队列
内部数据结构
因为内部不用锁,所以都是通过CAS+volatile来保证原子性以及可见性,所以Node的成员变量都是volatile的,而且添加更新都是通过UNSAFE操作。
private static class Node<E> {
volatile E item;
volatile ConcurrentLinkedQueue.Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(ConcurrentLinkedQueue.Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(ConcurrentLinkedQueue.Node<E> cmp, ConcurrentLinkedQueue.Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedQueue.Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
//初始化的时候head=tail=new Node(null)
private transient volatile ConcurrentLinkedQueue.Node<E> head;
private transient volatile ConcurrentLinkedQueue.Node<E> tail;
offer
因为没有锁,所以在高并发情况下,什么情况都会发生;
初始化之后head = tail = new Node(null),所以开始head和tail都指向一个新的node节点,但是tail节点并不是真正的尾节点;因为多线程竞争的时候,一个线程更新当前节点为尾结点,但是另一个线程会更新另一个节点为尾结点,这时前面的线程的尾结点就会暂停,去找到正确的尾节点更新。
public boolean offer(E e) {
checkNotNull(e);
final ConcurrentLinkedQueue.Node<E> newNode = new ConcurrentLinkedQueue.Node<E>(e);
//拿到tail节点,但是这个tail节点不一定是尾节点
for (ConcurrentLinkedQueue.Node<E> t = tail, p = t;;) {
ConcurrentLinkedQueue.Node<E> q = p.next;
//如果q==null表示p是真正的尾节点,那么就将新节点CAS更新为p的next节点;
//如果casNext成功,则返回true;如果不成功表示他的next节点已经被更新了
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
// 如果p != t,则将入队节点设置成tail节点,
//更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
// 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// 寻找尾节点
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
为什么不是每次都更新tail,因为每次都更新,那么高并发情况下,CAS更新tail会非常多,性能CPU都会受到影响;那么现在不及时更新tail,带来的唯一不好就是需要循环去找到真正的tail节点,从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。
poll
正常情况下,出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。
并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。采用这种方式也是为了减少使用CAS更新head节点的消耗,从而提高出队效率
public E poll() {
restartFromHead:
for (;;) {
for (ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
E item = p.item;
//拿到head,如果head的item不为null,就直接CAS更新这个item为null,并返回item
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
//这里也不是每次都会更新head
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。
// 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// p == q,则使用新的head重新开始
else if (p == q)
continue restartFromHead;
// 如果(q = p.next)下一个元素不为空,则将头节点的下一个节点设置成头节点
else
p = q;
}
}
}
按正常想法,head应该始终为Node(null)的,不应该有值,但是ConcurrentLinkedQueue不是这样设计的,head可能会变成实际节点上。
可以看看peek方法,会移动head到实际节点上
peek
public E peek() {
restartFromHead:
for (;;) {
//循环拿到head
for (ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
E item = p.item;
//如果head的item不为空,则更新head到这个节点,返回item
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
//如果p==q,意味着p.next == p,重新拿head来进行
else if (p == q)
continue restartFromHead;
//将p向后移动一位
else
p = q;
}
}
}
网友评论