介绍
ConcurrentLinkedQueue是采用链表实现的无界非阻塞线程安全队列。注意非阻塞采用的是cas实现的。
例子
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Ticket> queue = new ConcurrentLinkedQueue<Ticket>();
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
Ticket ticket = new Ticket();
@Override
public void run() {
ticket.setName("车票1");
queue.offer(ticket);
System.out.println(Thread.currentThread().getName() + " provider " + ticket);
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
if (!queue.isEmpty()) {
Ticket ticket = queue.poll();
System.out.println(Thread.currentThread().getName() + " consumer " + ticket);
}
}
});
t1.start();
try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
结果:
data:image/s3,"s3://crabby-images/d515e/d515e04476f5f1280aae4b55523f7bce7669f504" alt=""
源码分析
继承与实现关系
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable
链队列中节点Node源码
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
解释:
可以发现采用的是单向链表实现的,里面item是存放的元素,next是指向后继节点的。采用原子的方式进行操作这个Node节点。
构造器源码
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
解释:构造一个节点,作为头节点和尾节点。
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
// 遍历集合
for (E e : c) {
// 检查对象是否为null
checkNotNull(e);
// 将集合中的值封装成Node节点
Node<E> newNode = new Node<E>(e);
// 如果头节点为null
if (h == null)
// 创建一个Node节点,初始化头节点,尾节点
h = t = newNode;
else {
// 插入节点
t.lazySetNext(newNode);
// 更新尾节点
t = newNode;
}
}
// 如果头节点为null,说明集合里面没元素
if (h == null)
// 创建一个Node节点初始化头节点,尾节点
h = t = new Node<E>(null);
head = h;
tail = t;
}
offer方法源码
data:image/s3,"s3://crabby-images/fb5db/fb5db31549988e5bdb78adca7dc17ae53a79c0fa" alt=""
public boolean offer(E e) {
// 检查元素是否为空,为空就抛出空指针异常
checkNotNull(e);
// 将元素e包装成节点Node
final Node<E> newNode = new Node<E>(e);
// 从尾节点开始遍历
for (Node<E> t = tail, p = t;;) {
// 获取尾节点的后继节点
Node<E> q = p.next;
// 如果尾节点后继节点为空,说明p已经是最后的节点了。
if (q == null) {
// cas的方式更新p的后继节点为新节点,否则后继节点为null
if (p.casNext(null, newNode)) {
// 如果p不是尾节点了,说明新节点已经是尾节点了
if (p != t) // hop two nodes at a time
// cas方式更新尾节点为新节点
casTail(t, newNode); // Failure is OK.
// 返回true
return true;
}
// cas竞争失败,再次尝试
}
else if (p == q) // q不为空,p已经不是真正的尾节点了,说明有其他线程将元素出队了,p和q都指向了尾节点tail
/**
* t != t 说明尾节点已经被其他节点修改了。
* 如果尾节点已经被修改了,则使用t赋值p,否则使用头结点head赋值p,继续遍历。
*/
p = (t != (t = tail)) ? t : head;
else // 检查tail是否已经修改
/**
* 如果p不等于t了,那么说明p已经被修改了。如果t不等于tail了,那么说明为节点被修改了。
* 如果两者都被修改了,就使用最新的tail。
* 如果有一个没有被修改,那么就使用头结点作为p。
* 继续遍历。
*/
p = (p != t && t != (t = tail)) ? t : q;
}
}
poll方法源码
data:image/s3,"s3://crabby-images/ad388/ad3884a8a11b1ceaffca5c9c19c79e0b4e0d0f9d" alt=""
public E poll() {
restartFromHead:
// 死循环
for (;;) {
// 从头节点开始遍历,创建一个头节点的副本p
for (Node<E> h = head, p = h, q;;) {
// 获取p的值
E item = p.item;
// p的值不为空并且使用CAS设置p节点引用的元素为null
if (item != null && p.casItem(item, null)) {
// p已经不是头节点了
if (p != h)
// 如果p是尾节点,那么更新头结点为p
// 如果p不是尾节点,那么更新头节点为q
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {// 说明p为尾节点
// 更新头节点为p
updateHead(h, p);
return null;
}
else if (p == q) // q不为空,有其他线程将元素入队,导致p和q都指向了tail节点
continue restartFromHead;
else // 继续遍历
p = q;
}
}
}
// 更新头结点
final void updateHead(Node<E> h, Node<E> p) {
// 如果头结点不等于p并且cas方式更新头节点指向新节点
if (h != p && casHead(h, p))
// 让头结点指向自己,断开与后继节点之间的关系
h.lazySetNext(h);
}
size方法源码
public int size() {
int count = 0;
// 从头结点开始遍历
for (Node<E> p = first(); p != null; p = succ(p))
// 如果节点不为空
if (p.item != null)
// 就让数量加1,如果达到了Integer.MAX_VALUE,就中断循环
if (++count == Integer.MAX_VALUE)
break;
return count;
}
总结
- ConcurrentLinkedQueue数据操作的锁粒度是队列中的Node节点。
- ConcurrentLinkedQueue的计数从头结点开始遍历,对于null值不计数,如果达到最大值也只能是Integer.MAX_VALUE。
- ConcurrentLinkedQueue在并发线程读写中采用cas方式进行读写更新等。
网友评论