今天学习一下Condition的使用和实现原理,自己水平有限,有错误和不当之处欢迎指正批评。

使用Condition实现生产者和消费者模式
/**
* Created by dumingwei on 2017/7/4.
*/
public class ThreadCommunicateTestUseLock {
private int queueSize = 10;
private Queue<Integer> queue = new ArrayDeque<>(queueSize);
private Lock lock = new ReentrantLock();
//标记是否可以生产,如果队列满了,则不能生产元素
private Condition canProduce = lock.newCondition();
//标记队列是否空了,如果队列空了,则不能消费元素
private Condition canConsume = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
ThreadCommunicateTestUseLock test = new ThreadCommunicateTestUseLock();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
consumer.start();
Thread.sleep(2000);
producer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
try {
System.out.println("队列空,等待数据");
//队列已空,不能消费了,等一等
canConsume.await();
System.out.println("Consumer after await");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sleep(100);
queue.poll(); //每次移走队首元素
System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素");
//通知生产者生产数据
canProduce.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
try {
System.out.println("队列满,等待有空余空间");
//队列已满,不能生产了,等一等
canProduce.await();
System.out.println("Producer after await");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sleep(100);
queue.offer(1); //每次插入一个元素
System.out.println("向队列取中插入一个元素,队列剩余空间:" + (queueSize - queue.size()));
//已经有数据了,通知消费者消费数据
canConsume.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
看了看注释,竟然是17年创建的类,也是时光荏苒了。
Condition的await()方法作用
- 让当前线程等待直到被唤醒或者被中断。
- 和
Condition
对象关联的锁被自动释放,当前线不能被线程调度,处于休眠状态,直到下面四种场景发生:
2.1 其他线程调用了该Condition
对象的signal
方法并且当前线程被选为被唤醒的线程。
2.2 其他线程调用了该Condition
对象的signalAll
方法。
2.3 其他线程中断了当前线程并且线程挂起是支持线程中断的。
2.4 假唤醒发生。
在所有的场景中,在该方法返回之前,当前线程必须重新获取了和该Condition
对象关联的锁。当该方法返回的时候确保了当前线程一定持有了和该Condition
对象关联的锁。
Condition是一个接口,我们看它的一个子类的实现。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** 条件队列中的第一个节点 */
private transient Node firstWaiter;
/** 条件队列中最后一个节点 */
private transient Node lastWaiter;
}
}
ConditionObject的await()方法
public final void await() throws InterruptedException {
if (Thread.interrupted())//如果被中断,直接抛出异常
throw new InterruptedException();
//注释1处,把当前线程加入到条件等待队列
Node node = addConditionWaiter();
//注释2处,释放锁,才知道为什么说调用await会释放锁了
int savedState = fullyRelease(node);
int interruptMode = 0;
//注释3处,这是一个while循环
while (!isOnSyncQueue(node)) {
//阻塞当前线程
LockSupport.park(this);
//如果被中断了,就跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//注释4处
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
//清理已经取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
//注释5处,根据在等待过程中是否被中断来抛出中断异常,或者重新中断当前线程,或者什么也不做
reportInterruptAfterWait(interruptMode);
}
注释1处,将当前线程加入到条件等待队列。
ConditionObject的addConditionWaiter()方法
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点取消等待了,遍历清除所有已经取消等待的节点。
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//清除完以后,lastWaiter就是最后一个正在等待的节点或者为null。
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
//将尾节点指向新插入的节点
lastWaiter = node;
return node;
}
ConditionObject的await()方法的注释2处,释放当前线程持有的所有锁
AQS的fullyRelease(Node node)方法
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
//如果不能完全释放所有的锁,抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
注意:这个方法如果不能完全释放所有的锁,抛出异常。释放了所有的锁之后,AQS的同步状态值 state==0
。
ConditionObject的await()方法的注释3处,这是一个while循环。
while (!isOnSyncQueue(node)) {
//挂起当前线程
LockSupport.park(this);
//线程恢复以后,判断一下在挂起过程中是否被中断过
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
退出while循环的条件是
- 当前节点已经进入到同步队列等待获取锁了,while条件不满足。
- 线程被唤醒后被中断了,跳出循环。
首先调用isOnSyncQueue(Node node)
判断当前线程是否在同步队列中等待获取锁,如果没有的话就挂起当前线程。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
线程恢复以后(其他线程调用等待条件的notify
方法并且当前线程被选择为唤醒的线程,或者其他线程调用等待条件的notifyAll
方法),判断一下在挂起过程中是否被中断过,如果被中断了,就直接跳出循环,否则继续while循环。
ConditionObject的checkInterruptWhileWaiting(Node node) 方法
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
//如果被中断了,调用
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
AQS的transferAfterCancelledWait(Node node)方法
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
AQS的transferAfterCancelledWait(Node node)方法没有看懂,但是不影响主流程,只需要知道ConditionObject的checkInterruptWhileWaiting(Node node) 方法会根据线程是否被中断返回三种状态值REINTERRUPT = 1
,THROW_IE = -1
或者0 。
ConditionObject的await()方法的注释4处,如果条件满足将中断模式设置为REINTERRUPT
。但是我感觉注释4处的条件是没法同时满足的。
ConditionObject的await()方法的注释5处,根据中断模式interruptMode来决定是否抛出中断异常或者重新中断当前线程。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
//抛出异常
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
//再次中断自己
selfInterrupt();
}
Condition的await()方法基本分析完毕,现在看一看Condition的signal()方法
Condition的signal()方法
/**
* 将在条件队列中等待时间最长的线程移动到同步队列中来获取锁。
*/
public final void signal() {
//如果当前线程没有持有写锁,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//唤醒第一个等待者
doSignal(first);
}
唤醒第一个等待者
private void doSignal(Node first) {
do {
//将firstWaiter赋值为firstWaiter的下一个等待节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
这个方法的逻辑就是从等待队列的头部开始向后遍历,直到将某一个节点从条件队列中移动到同步队列中去获得锁。
/**
* 把一个队列从条件队列中移动到同步队列中。
* 如果成功返回true。
* @param node the node
* @return 如果成功返回true。 (否则说明该节点在signal之前取消等待了。)
*/
final boolean transferForSignal(Node node) {
/*
* 注释1处,如果不能改变waitStatus,说明该节点取消等待了。返回false。
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//注释2处,加入队列
Node p = enq(node);
int ws = p.waitStatus;
//注释3处,直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
//返回true
return true;
}
注释1处,如果不能改变waitStatus,说明该节点取消等待了。返回false。
注释2处,将当前节点加入队列同步队列。
注释3处,如果当前线程已经取消等待了,或者无法将当前节点的等待状态设置为SIGNAL,就直接唤醒节点的线程。
Condition的signalAll()方法
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
该方法与signal的区别就在最后一行代码
private void doSignalAll(Node first) {
//将第一个条件等待线程和最后一个条件等待线程都置为null
lastWaiter = firstWaiter = null;
do {
//唤醒所有的线程
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/**
* 把一个队列从条件队列中移动到同步队列中。
* 如果成功返回true。
* @param node the node
* @return 如果成功返回true。 (否则说明该节点在signal之前取消等待了。)
*/
final boolean transferForSignal(Node node) {
/*
* 注释1处,如果不能改变waitStatus,说明该节点取消等待了。返回false。
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//注释2处,加入队列
Node p = enq(node);
int ws = p.waitStatus;
//注释3处,直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
//返回true
return true;
}
总结一下:
-
ConditionObject的await()方法就是将一个线程加入到一个条件队列,等待条件满足就会从条件队列移动到锁的等待队列去竞争锁。
-
ConditionObject的notify()方法就是选取条件等待队列中的一个线程,将这个线程移动到锁的等待队列去竞争锁。ConditionObject的notifyAll()方法就是将条件等待队列里面的所有的线程都移动到锁的等待队列去竞争锁。
网友评论