public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient volatile Transferer<E> transferer;
public SynchronousQueue() {
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
throw new InterruptedException();
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
throw new InterruptedException();
public E poll() {
return transferer.transfer(null, true, 0);
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
//e != null表示成功取到数据了
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
public boolean isEmpty() {
return true;
public int size() {
return 0;
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
U.compareAndSwapObject(this, NEXT, cmp, val);
boolean casItem(Object cmp, Object val) {
return item == cmp &&
U.compareAndSwapObject(this, ITEM, cmp, val);
void tryCancel(Object cmp) {
U.compareAndSwapObject(this, ITEM, cmp, this);
boolean isCancelled() {
return item == this;
boolean isOffList() {
return next == this;
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long ITEM;
private static final long NEXT;
static {
try {
ITEM = U.objectFieldOffset
NEXT = U.objectFieldOffset
} catch (ReflectiveOperationException e) {
throw new Error(e);
transient volatile QNode head;
transient volatile QNode tail;
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
if (tn != null) { // lagging tail
advanceTail(t, tn);
if (timed && nanos <= 0L) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
//当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true
if (x == s) { // wait was cancelled
clean(t, s);
return null;
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
return (x != null) ? (E)x : e;
else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
advanceHead(h, m); // successfully fulfilled
return (x != null) ? (E)x : e;
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (head.next == s)
: 0;
for (;;) {
if (w.isInterrupted())
Object x = s.item;
//x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
if (spins > 0)
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
//parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒
LockSupport.parkNanos(this, nanos);
- 通过上述源码我们可以看出SynchronousQueue本身没有容量存储元素,但是它是通过管理提交操作的线程队列来实现阻塞队列的
- SynchronousQueue可以实现控制线程先进先出进行排序,也就是先被挂起的线程先被唤醒,这个内部是通过链表来实现的。SynchronousQueue默认是不保证证唤醒的顺序的
- SynchronousQueue的不带超时时间的offer和poll方法不会挂起线程,而take和put方法可能会挂起线程。
- SynchronousQueue一个典型的应用场景是线程池newCachedThreadPool,从上面的源码可以看出,如果入队操作和出队操作的处理速度相差比较大的话有可能会创建大量线程,有耗尽内存的风险