synchronized有个重要的功能,可以通过object中的wait()和 notify()方法实现生产者/消费者。ReentrantLock基于Condition也同样可以实现,而且相对于synchronized的无差别通知,ReentrantLock可以选择性的通知,减少了很多无用的线程竞争。本文主要就是分析下ReentrantLock是怎么通过Condition实现生产者/消费者模式的。
public class ProducerCustomerWithLock {
Executor pool = Executors.newFixedThreadPool(5);
private List<String> storeList = new LinkedList<>();//仓库
//仓库容量
private int MAX_VALUE = 5;
//仓库为空
private int MIN_VALUE = 0;
// 线程锁
private Lock lock = new ReentrantLock();
//仓库满了,绑定生产者线程
private Condition full = lock.newCondition();
//仓库为空,绑定消费者线程
private Condition empty = lock.newCondition();
//生产者
private class producer implements Runnable {
@Override
public void run() {
while (true) {
produce();
}
}
private void produce() {
System.out.println(Thread.currentThread().getName() + "进入仓库,准备生产!");
try {
lock.lock();
if (storeList.size() == MAX_VALUE) {
System.out.println("仓库已满,等待消费");
Thread.sleep(1000);
full.await(); //当前线程等待,让其他线程继续执行,可以看出wait是释放锁的
}
if (storeList.size() < MAX_VALUE) {
String product = "产品" + new Random().nextInt();
storeList.add(product);
System.out.println(Thread.currentThread().getName() + "往仓库中生产了一个产品!" + product);
}
Thread.sleep(1000);
empty.signalAll();//唤醒消费者线程
} catch (InterruptedException e) {
System.out.println("中断异常");
// e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
private class consumer implements Runnable {
@Override
public void run() {
while (true) {
consume();
}
}
private void consume() {
System.out.println(Thread.currentThread().getName() + "进入仓库,准备消费!");
try {
lock.lock();
if (storeList.size() == MIN_VALUE) {
System.out.println("仓库已空,等待生产");
Thread.sleep(1000);
empty.await(); //当前线程等待,让其他线程继续执行,可以看出wait是释放锁的
}
if (storeList.size() > MIN_VALUE) {
System.out.println(Thread.currentThread().getName() + "从仓库取得产品:" + storeList.remove(0));
}
Thread.sleep(1000);
full.signalAll();//唤醒生产者线程
} catch (InterruptedException e) {
System.out.println("中断异常");
// e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
//启动生产者和消费者线程
public void start() {
for (int i = 1; i < 5; i++) {
pool.execute(new producer());
pool.execute(new consumer());
}
}
public static void main(String[] args) {
ProducerCustomerWithLock pc = new ProducerCustomerWithLock();
pc.start();
}
}
打印结果如下,生产者消费者交替运行:
pool-1-thread-1进入仓库,准备生产!
pool-1-thread-2进入仓库,准备消费!
pool-1-thread-3进入仓库,准备生产!
pool-1-thread-1往仓库中生产了一个产品!产品-885144207
pool-1-thread-4进入仓库,准备消费!
pool-1-thread-5进入仓库,准备生产!
pool-1-thread-1进入仓库,准备生产!
pool-1-thread-2从仓库取得产品:产品-885144207
pool-1-thread-2进入仓库,准备消费!
pool-1-thread-3往仓库中生产了一个产品!产品-1433422526
pool-1-thread-3进入仓库,准备生产!
pool-1-thread-4从仓库取得产品:产品-1433422526
pool-1-thread-4进入仓库,准备消费!
pool-1-thread-5往仓库中生产了一个产品!产品-137356193
pool-1-thread-5进入仓库,准备生产!
pool-1-thread-1往仓库中生产了一个产品!产品831540660
...
接下来就直接进入正题吧,看下ConditionObject.await()方法。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
先判断线程状态是否被中断,如果被中断则抛出异常。然后开始构建Condition队列,看下addConditionWaiter()方法。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
先判断condition尾节点的waitStatus是否是condition状态。如果不是只能是cancel状态,则把队列中是cancel状态的节点移除。找到第一个是condition状态的尾节点。
新建一个Node节点,模式是CONDITION,如果之前不存在尾节点。则把当前节点作为头结点和尾节点,否则把当前节点置为尾节点。然后执行fullyRelease(node)方法。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先进入到await的方法的前提条件是获取到锁,所以这一步是释放锁并且唤醒AQS队列头结点的后置节点。这里为什么要释放锁呢,很简单await方法会阻塞当前线程。当前线程如果不释放锁就会导致后面的线程获取不到锁从而阻塞。极大的影响性能还可能造成死锁。所以await方法是会释放锁的。
再看isOnSyncQueue(node)是判断当前节点是否在AQS队列中,如果不在则阻塞当前节点所在线程。直到signal方法唤醒该线程。那么先看下signal():
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
首先把当前节点waitStatus状态CAS操作为0。如果设置失败,且该节点为condition头结点,则把该节点排除出队列。
如果设置成功,则把该节点插入到AQS队列中。也就是说Signal()唤醒后不是立即执行的。而是进入到AQS队列中排队。
如果该节点被取消了或者已经被设置成了SIGNAL,则取消阻塞该节点所在线程。其他情况由AQS头节点唤醒。
再回过头看await方法。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
之前睡眠的线程被唤醒了,改节点已经加入到了AQS队列可以退出循环了,然后主要就是去获取锁并返回线程的中断状态。
网友评论