生产者消费者实例:
public class ProductorCustom<T> {
private final ReentrantLock lock = new ReentrantLock();
private Condition putButFull = lock.newCondition();
private Condition tackButEmpty = lock.newCondition();
private int head, tail, count;
private final T[] items;
public ProductorCustom() {
this(10);
}
public ProductorCustom(int maxSize) {
items = (T[]) new Object[maxSize];
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while(count == items.length) {
putButFull.await();
}
items[tail] = item;
if (++tail == items.length) {
tail = 0;
}
++count;
tackButEmpty.signal();
}
finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
tackButEmpty.await();
}
T item = items[head];
if (++head == items.length) {
head = 0;
}
--count;
putButFull.signal();
return item;
}
finally {
lock.unlock();
}
}
}
Condition是Object的wait/notify的替代,更灵活;Condition接口定义的方法,await对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。
ReentrantLock 的newCondition():
public Condition newCondition() {
return sync.newCondition();
}
最终调用的是AQS中的内部类ConditionObject,它实现了Condition接口。
概述
AQS维护了一个锁队列;ConditionObject同样维护了一个条件队列,该队列里的Node等待着signal信号;两个队列间的关系是这样的:
- 比如节点A是锁队列的head,acquire成功,B等待在A后;
- A线程执行await,会把A移到条件队列中,release唤醒B
- B执行signal,会将A放回到锁队列,但并没有被唤醒;
大致上就是一个线程正在运行,但是因为某些条件未满足需要等待,我们就把他放到条件队列中等待,其他线程就有机会执行,当条件满足就从条件队列中取出一个或全部等待的线程,把他放回锁队列中等待获取执行资格;
接下来看看实现细节:
ConditionObject
AQS里的内部类,实现了Condition,ReentrantLock 调用的就是它;
所以才说AQS是JUC包之基;
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列的头节点. */
private transient Node firstWaiter;
/**条件队列的尾节点. */
private transient Node lastWaiter;
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建当前线程的Node节点,放入条件队列
Node node = addConditionWaiter();
//锁队列中的当前线程release,唤醒后面的线程
int savedState = fullyRelease(node);
int interruptMode = 0;
//isOnSyncQueue检测节点是否在锁队列;
//这里的逻辑是signal会将node放回到锁队列,如果node不在锁队列,
//说明条件没满足即没有收到signal,需要继续等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//唤醒后,执行acquireQueued,尝试获取锁即改变同步状态,失败就在锁队列中等待
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); //清理waitStatus为CONDITION的节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
如上面注释,一个原本锁队列中正在运行的HEAD节点,执行await,会在条件队列中创建它的Node节点,锁队列中的它执行release操作,之后就LockSupport.park挂起等待signal唤醒
signal
public final void signal() {
//返回true代表当前线程正式获取锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//这里利用循环直到将firstWaiter放回锁队列为止
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //将节点加入到锁队列
int ws = p.waitStatus;
//如果该结点的状态为cancel 或者修改waitStatus失败,则直接唤醒。
//这里一般ws=0,所以会执行更改节点状态,改为SIGNAL,更改成功放回锁队列
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal主要逻辑就是将条件队列的第一个节点firstWaiter加入到锁队列。
超时方法分析在我的CyclicBarrier文章里
网友评论