ConcurrentLinkedQueue 是利用依赖CAS实现的基于链接节点的无界安全队列。
入队
public boolean offer(E e){
//这个队列的元素不能是 null
checkNotNull(e);
//为入队的元素创建一个 Node
final Node<E> newNode = new Node<E>(e);
for(Node<E> t = tail, p = t;;){
Node<E> q = p.next;
//q 是 tail.next
if(q == null){
// q == null,说明 tail是尾节点
//cas的方式将 tail.next 设置为NewNode(cas 的条件是 tail.next为 null)失败则继续重试,
//失败说明其他线程CAS 成功了
if(p.castNext(null, newNode)){
//
if(p != t){
castTail(t, newNode);
}
return true;
}
}else if(p == q){
// q==q 且 !=null
p = (t != (t = tail) ? t : head;
}else{
//说明tail.next 是队尾节点, 此时需要将队尾节点更新为 t
p = (p != t && t != (t=tail))? t : q;
}
}
}
//nextOffset 是 Node.next 的地址偏移
boolean castNext(Node<E> cmp, Node<E> val){
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
出队列
public E poll(){
restartFromHead:
for(;;){
for(Node<E> h = head, p = h, q ;;){
E item = p.item;
for(item != null && p.casItem(item, null)){
if(p != h){
updateHead(h, ((q = p.next) !=null)? q : p);
}
return item;
}
}else if((q = p.next) == null){
updateHead(h, p);
return null;
}else if(p == q){
continue restartFromHead;
}else{
p = q;
}
}
}
网友评论