by shihang.mai
使用
public class ConTest {
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
public static void main(String[] args) {
// TODO Auto-generated method stub
ConTest test = new ConTest();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
consumer.start();
producer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
try {
lock.lock();
System.out.println("我在等一个新信号"+this.currentThread().getName());
condition.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
System.out.println("拿到一个信号"+this.currentThread().getName());
lock.unlock();
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
try {
lock.lock();
System.out.println("我拿到锁"+this.currentThread().getName());
condition.signalAll();
System.out.println("我发出了一个信号:"+this.currentThread().getName());
} finally{
lock.unlock();
}
}
}
}
/*
结果:
我在等一个新信号Thread-1
我拿到锁Thread-0
我发出了一个信号:Thread-0
拿到一个信号Thread-1
*/
源码分析
Lock接口中还有一个方法newCondition,创建一个条件队列。ReentrantLock的内部类Sync如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
而ConditionObject是AQS中的类
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
public ConditionObject() { }
//......
}
- 看上面使用的例子,Condition是需要配合Lock使用的,而Lock原理又依赖AQS,所以ConditionObject定义在AQS中。
- condition内部维护了一个单向的等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态
- 看上面ConditionObject,有两个属性firstWaiter和lastWaiter。它们都复用了AQS中CLH队列中的Node对象,不过只用了Node中的nextWaiter属性。当然一个Lock可以new多次Condition,形成多个Condition队列,如下图
await
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
}
- 先将当前节点封装为Conditon Node, firstWaiter和lastWaiter都指向它,如果不是第一个,直接将上一个的nextWaiter指向自身,并将lastWaiter指向自身。即形成单链表
- 释放AQS锁,并且唤醒在同步队列中头结点的后继节点引用的线程。
- 当线程第一次调用condition.await()方法时,会进入到这个while()循环中,挂起,当 当前线程被中断或者其他线程调用condition.signal/condition.signalAll方法将当前节点移动到了同步队列后,就会退出循环
- 退出循环后,调用acquireQueued放到CLH队列(ReentranLock中详细说了,这里不说)
主逻辑需要调用的AQS代码如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
signal
public class ConditionObject implements Condition, java.io.Serializable {
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
}
主逻辑中AQS的辅助代码如下:
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 将等待队列中的头节点Condition Node(waitState=-2)改为waitState=0
- 将节点放到CLH队列
singleAll也就将整个等待队列一个个往CLH队列放,不详细说.
网友评论