type | Class | 存储结构 | 数据组织算法 | 备注 |
---|---|---|---|---|
非阻塞 | LinkedList | 双端链表 | 双端链表 | |
非阻塞 | PriorityQueue | 数组 | 最小堆 | |
非阻塞 | ConcurrentListedQueue | 双端链表 | 双端链表 | |
阻塞 | ArrayBlockingQueue | 数组 | 头尾指针标记 | 数组大小固定,空间循环使用 |
阻塞 | LinkedBlockingQueue | 单向队列 | 单向队列 | 通过capacity字段控制队列容量 |
阻塞 | DelayQueue | PriorityQueue | PriorityQueue | |
阻塞 | PriorityBlockingQueue | 数组 | 最小堆 |
Queue
public interface Queue<E> extends Collection<E> {
/**
* 同offer,入队失败抛出异常
*/
boolean add(E e);
/**
* 入队,失败返回false
*/
boolean offer(E e);
/**
* 同poll,出队失败抛出异常
*/
E remove();
/**
* 出队,失败返回null
*/
E poll();
/**
* 同peek,head为空抛出异常
*/
E element();
/**
* 访问head,head为空返回null
*/
E peek();
}
存储原理
LinkedList
private static class Node<E> {
E item;
Node<E> next;
Node<E> prev;
Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}
PriorityQueue
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
transient Object[] queue; // non-private to simplify nested class access
ConurrentLinkedQueue
static final class Node<E> {
volatile Node<E> prev;
volatile E item;
volatile Node<E> next;
Node() { // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR
}
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext or casPrev.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
void lazySetPrev(Node<E> val) {
UNSAFE.putOrderedObject(this, prevOffset, val);
}
boolean casPrev(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long prevOffset;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
prevOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("prev"));
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
BlockingQueue
public interface BlockingQueue<E> extends Queue<E> {
/**
* 当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用;
*/
void put(E e) throws InterruptedException;
/**
* 若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列已经有空余的地方,与 put
* 方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 当阻塞队列为空时,获取队头数据的线程会被阻塞;
*/
E take() throws InterruptedException;
/**
* 从BlockingQueue取出一个队首的对象,如果在指定时间内,
* 队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
* 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
*/
int drainTo(Collection<? super E> c);
/**
* 一次性从BlockingQueue获取最多maxElements个可用的数据对象(还可以指定获取数据的个数),
* 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
*/
int drainTo(Collection<? super E> c, int maxElements);
}
存储原理
ArrayBlockingQueue
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
/*take指针指向队列头部,每取一个元素,takeIndex++,如果到了数组最后一个元素,则从0重新开始*/
int takeIndex;
/** items index for next put, offer, or add */
/*put指针指向队列尾部,每添加一个元素,putIndex++,如果到了数组最后一个元素,则从0重新开始**/
int putIndex;
LinkedBlockingQueue
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* 下一个节点,单向队列
*/
Node<E> next;
Node(E x) { item = x; }
}
/** 队列容量 */
private final int capacity;
/** 当前队列大小 */
private final AtomicInteger count = new AtomicInteger();
/** 队列头节点,head节点是一个虚拟的节点,真正有意义的元素从head->next开始 */
transient Node<E> head;
/** 队列尾节点 */
private transient Node<E> last;
DelayQueue
/**使用优先级队列存储,优先级就是延迟的时间,take取延迟最近的元素*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
PriorityBlockingQueue
/**元素数组,最小堆算法组织结构*/
private transient Object[] queue;
/**优先级比较器*/
private transient Comparator<? super E> comparator;
网友评论