Condition
利用锁可以让线程以同步的方式来执行一段代码,而Condition则是用来实现线程之间协作的。
功能概述:
多个线程之间协作 工作时,在获取锁之后,还可能需要判断一些条件是否满足,才可以继续执行,如果不满足则需要挂机等待直到条件满足,这里的条件就对应到Condition对象。
- 配合Lock使用,附属于Lock实例
- 一个Lock实例,可以创建多个Contion变量,根据自己的应用场景
方法介绍
1.Lock#newCondition :创建一个Condition实例
- Condition#await :将持锁的当前线程加入到condition变量的等待队列后,挂起当前线程
- Condition#sinal : 激活条件变量等待队列中,队首的线程,线程激活后抢占锁,抢不到就进入锁的等待队列中排队。
- Condition#sinalAll : 激活条件变量等待队列中,所有的线程。线程激活后抢占锁,抢不到就进入锁的等待队列中排队。
设计
-
ReentrantLock的Condition,如左图可以有多个条件变量(Condition)。
-
synchronize,如右图只有一个条件变量。
image.png -
每个Condition内都有独立的等待队列,线程在等待变量信号时排队;得到信号后从Condition的队列中移除,参与到Lock的同步队列中去竞争锁:
image.png
Demo
经典的生产者消费者实例,利用2个Condition:
- 不满就生产
- 不空就消费
BoundedBuffer
package com.rock.multithread.juc.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
CakeFactory
package com.rock.multithread.juc.lock;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class CakeFactory {
private ReentrantLock lock = new ReentrantLock();
private Condition not_empty = lock.newCondition();
private Condition not_full = lock.newCondition();
private int queueSize = 5;
private Queue<Long> queue = new ArrayDeque<>(queueSize);
private long cakeCount = 1000;
private long counter =0;
public long makeCake() throws InterruptedException {
lock.lock();
try {
while (queue.size() == queueSize) {
not_full.await();
}
long no = counter++;
queue.add(no);
not_empty.signal();
return no;
} finally{
lock.unlock();
}
}
public long getCake(){
lock.lock();
try {
while (queue.isEmpty()){
try {
not_empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Long no = queue.poll();
not_full.signal();
return no;
} finally {
lock.unlock();
}
}
}
ConditionDemo
package com.rock.multithread.juc.lock;
import sun.invoke.empty.Empty;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
public static BoundedBuffer boundedBuffer = new BoundedBuffer();
public static CakeFactory cakeFactory = new CakeFactory();
public static long counter =0;
public static void main(String[] args) throws InterruptedException {
//Thread makeThread = new Thread(ConditionDemo::put);
// Thread eatThread = new Thread(ConditionDemo::poll);
Thread makeThread = new Thread(ConditionDemo::makeCake);
Thread eatThread = new Thread(ConditionDemo::eatCake);
makeThread.start();
TimeUnit.SECONDS.sleep(1);
eatThread.start();
makeThread.join();
eatThread.join();
}
public static void put(){
while (true){
try {
long no = counter ++;
System.out.println("put " + no);
boundedBuffer.put(no);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void poll(){
while (true){
try {
Object take = boundedBuffer.take();
System.out.println("take " + take);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void makeCake() {
while (true) {
try {
long cakeNo = cakeFactory.makeCake();
System.out.println("make cake" + cakeNo);
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void eatCake(){
while (true){
long cakeNo = cakeFactory.getCake();
System.out.println("eat cake" + cakeNo);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
网友评论