- Java-AQS同步器 源码解读<一>独占锁加锁
- Java-AQS同步器 源码解读<二>独占锁解锁
- Java-AQS同步器 源码解读<三>共享锁
- Java-AQS同步器 源码解读<四>-条件队列上
- Java-AQS同步器 源码解读<五>-条件队列下
前文
AQS的源码分析已经写了篇了,
第一篇 我大概描述了下同步队列的独占模式的加锁,
第二篇我带大家分析了下独占模式的解锁
第三篇和大家分析了同步队列的共享模式,有的小伙伴反应前面的3篇文章写的有点儿干,可能我没注意自己的写作方法和方式,今天这篇我就改变下风格。
好的,废话不多说,我们进入正文,今天给大家看的是条件队列,是AQS中另外的一个队列-Condition,大家应该还记得之前我们说过node类,node类可以构成Sync Queue 和Condition Queue 第一篇文章说过,由于条件队列和前面比起来 复杂一点 我会分上下2篇文章聊
- 上篇 我会带大家了解下为什么要有Condition,和Condition在JDK中的使用,最后和大家聊下Lock和Condition这2个接口,最后简单说下Object监视器锁和Condition
- 下篇 我会和大家进入源码分析下AQS中 Condition的具体实现 最后做个全文总结
为什么需要条件队列Conditon Queue
举个小例子
之前看很多文章或者看到很多博客描述AQS的时候,很多小伙伴都不清楚为什么要存再一个Condition Queue 明明Sync Queue 就可以了呀,
那我就举一个小例子 大家带入一下 看看条件队列存在的意义:
话说小明中午放学去食堂排队吃饭,但是小明就喜欢吃红烧肉,如果没红烧肉就吃不下去饭,这个时候小明排队排到了窗口,点菜的时候发现没有红烧肉了,咋办呢 小明就在那边干等着,但是食堂打饭的大叔不乐意了
- 小明:大叔 我要一份红烧肉加一碗米饭
- 食堂大叔:哎呀,同学对不住啦,今天红烧肉太好卖了,卖完了,要不您换个菜
- 小明:不行不行,我一顿不吃红烧肉就吃不下饭
- 食堂大叔:那好的 你先站旁边 单独排个对,让后面的同学先来
- 小明:好的 可以的 那红烧肉有了 你通知下我哈 我重新去排队
- 食堂大叔:嗯好的 好的可以的 等有了 我叫你一下
- 过了1分钟。。。
- 小强:大叔今天红烧肉 没有了么,我没看到呀
- 食堂大叔:小伙子 你先站旁边 站在小明后面排队,红烧肉一会儿就上
- 小强:跑过去 排在了小明的后面
- 过了2分钟。。。
- 食堂大叔:哎 小伙子们 红烧肉来了
- 小明,小强:好的 马上去吃饭的队伍里面去排队
- 排了1分钟后。。。。
- 小明和小强总于点到了心爱的红烧肉
不知道 大家看了上面的例子 你有什么感触,食堂排队吃饭 就相当于一个Sync Queue 一样,小明和小强排队等红烧肉就是一个Conditin Queue,如果没有这个条件队列 那小明和小强后面排队的人都要在那边干等着,有了条件队列以后整个排队吃饭的程序就没问题了,当然条件队列可能有很多,就像有的人喜欢吃红烧肉,有的人喜欢吃大煮干丝一样,如果没有就自己重写在旁边去排队,等待唤醒。
分析
从上面的例子中,我们应该可以知道条件队列中的node 一开始是要去SyncQueue中去争取锁的,但是获取到锁以后 发现条件不满足,就会释放掉已获取到锁转儿进入条件队列等待,再得到Signal信号后唤醒,又进入了Sync Queue 中去争取锁,获取到以后 最终执行后面的方法,在有些场景下单纯的Sync Queue 是不能解决问题的 这个时候就需要Condition Queue 去配合使用
怎么使用条件队列
写个小Demo
package org.example;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @ClassName ConditionDemo
* @Auther burgxun
* @Description: 一个使用AQS条件队列的Demo
* @Date 2020/3/31 21:43
**/
public class ConditionDemo {
final ReentrantLock lock;//重入锁
private final Condition notEmpty;//一个非空的Condition
private final Condition notFull;//一个非满的Condition
final String[] stringItems;//存储元素的集合
int count;//数组当前的容量
int putIndex;//取的时候下标
int getIndex;//获取的是下标
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss SSS");
public ConditionDemo(int capacity) {
if (capacity < 0) {
throw new IllegalArgumentException();
}
this.stringItems = new String[capacity];
this.lock = new ReentrantLock();
this.notEmpty = this.lock.newCondition();
this.notFull = this.lock.newCondition();
}
public void Put(String item) {
this.lock.lock();
try {
while (this.count == stringItems.length) {//如果当前的数组已经满了 那就让线程等待 非满的条件
System.out.println(String.format("NowDate:%s %s:当前集合已满,需要等待数据取出",
simpleDateFormat.format(new Date()),
Thread.currentThread().getName()));
notFull.await();
}
stringItems[putIndex] = item;
System.out.println(String.format("NowDate:%s %s:集合中新增一条数据- %s", simpleDateFormat.format(new Date()),
Thread.currentThread().getName(),
item));
if (++putIndex == stringItems.length) {//如果已经是放到最后一位了 那就从头开始放 相当于一个循环数组
putIndex = 0;
}
this.count++;
notEmpty.signal();//因为这边已经新增加一个了 所以要唤醒等待非空的条件的线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.lock.unlock();
}
}
public String get() throws InterruptedException {
this.lock.lock();
try {
while (count == 0) {//如果获取元素的时候 发现当前集合中的元素个数为0了 那就等待一个非空的条件 线程进入循环
System.out.println(String.format("NowDate:%s %s:集合中已空,需要等待新增数据",
simpleDateFormat.format(new Date()),
Thread.currentThread().getName()));
notEmpty.await();
}
String item = stringItems[getIndex];
stringItems[getIndex] = null;
System.out.println(String.format("NowDate:%s %s:集合中取出一条数据- %s",
simpleDateFormat.format(new Date()),
Thread.currentThread().getName(),
item));
if (++getIndex == stringItems.length) {//如果获取的时候 已经到了尾部,那就从头获取
getIndex = 0;
}
count--;
notFull.signal();
return item;
} finally {
this.lock.unlock();
}
}
public static void main(String[] args) {
ConditionDemo conditionDemo = new ConditionDemo(3);
ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 10, 10,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5));
executorService.prestartCoreThread();
try {
/*设置5个生产者线程*/
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
for (int j = 0; j < 2; j++) {
try {
Thread.sleep(100);
conditionDemo.Put("Add index" + j);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/*设置一个消费者线程*/
executorService.submit(() -> {
try {
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
String item = conditionDemo.get();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
运行结果:
结果
看完了 这个小demo 你是否对Condition 有了点儿了解
JDK中是怎么使用的
下面 我们去看下 JDK里面 怎么去用这个Condition的
我们进入到 ArrayBlockingQueue的源码中看下:
首先我们看下默认的构造函数
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();//非空Condition
notFull = lock.newCondition();//非满Condition
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();//如果当前集合已经满了 那线程就进入大非满的Condition中等待
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();//因为填加了一个元素 所以要唤醒一个等待在非空的Condition线程
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//这边也是一样的 如果当先集合没有元素了,那就线程进入非空的Condition 等待
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//因为集合已经取走一个元素了 那一定是非满了,那就唤醒在非满上等待的线程
return x;
}
相信大家看了我截取的部分ArrayBlockingQueue的部分源码 也能明白了很多,其实我上面的demo 就是按照这个写的~
Lock和Condition
Lock
那首先 我什么要聊lock 这个接口呢 我们看看代码再说
**
* @ClassName Lock
* @Auther burgxun
* @Date 2020/3/28 19:43
**/
public interface Lock {
/**
* 获取锁
*/
void lock();
/**
* 获取锁 会抛出中断异常
*/
void lockInterruptibly() throws InterruptedException;
/**
* 非阻塞式的获取锁 尝试过去一次 成功就返回true 失败就返回false
*/
boolean tryLock();
/**
* 尝试获取锁 带超时时间 可中断的 如果获取不到锁 线程会在阻塞队列中 线程休眠 不参与线程调度
* 直到下面三种情况 才被唤醒
* 第一种 情况是 获取到了锁
* 第二种 情况是 其他线程中断了当前线程 这边抛出异常 或者在调用方法的开始 线程的中断标识已经设定了true 这个在子类的实现中 我们会看到
* 第三种 情况是 设定的获取锁时间到了 自动退出争抢 线程唤醒
*
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁
*/
void unlock();
/**
* 返回一个Condition对象
*/
Condition newCondition();
}
header 1 | header 2 |
---|---|
lock | 阻塞 |
lockInterruptibly | 阻塞 可中断 |
tryLock | 非阻塞 |
tryLock(long time, TimeUnit unit) | 超时机制 可中断 |
上面是我对Lock 接口各个功能的描述,我们看最后一个方法的时候,发现Lock有一个newCondition的方法 而且返回了一个Condition对象 说明我们的Condition是和lock关联的,而且是lock对象创建了Condition,从我们之前看的ArrayBlockingQueue中也能看到这个关系
那我去源码中去找下是怎么实现lock的newCondition方法的
下面是我提取了ReentrantLock的几个方法 如下:
/**
* @ClassName ReentrantLock
* @Auther burgxun
* @Description:
* @Date 2020/3/28 19:44
**/
public class ReentrantLock implements Lock {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final ConditionObject newCondition() {
return new ConditionObject();
}
}
//非公平锁
static final class NonfairSync extends Sync {
@Override
void lock() {
}
}
//公平锁
static final class FairSync extends Sync {
@Override
void lock() {
//acquire(1);
}
}
//默认实现非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
从代码中 我们可以看到 最终创建的对象还是AQS中的ConditionObject对象
Condition
Condition接口:
/**
* @ClassName Condition
* @Auther burgxun
* @Date 2020/3/28 18:10
**/
public interface Condition {
/**
* 线程等待 进入条件队列
*/
void await() throws InterruptedException;
/**
* 线程等待 不会抛出中断异常 只会记录中断标记 做出线程中断
*/
void awaitUninterruptibly();
/**
* 线程等待 设置等待的时间 单位是毫秒 这个方法和await(long time, TimeUnit unit)方法比起来 此方法看上去 比较鸡肋
* 但是需要注意的是 2个方法放回值 是不同的 这个方法返回的是剩余等待的时间
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 线程进入阻塞等待 3中情况 会唤醒 一是获取到 signalled的信号,二是线程发生了中断,三是等待的唤醒的时间已经到了
* 如果返回是true 则说明是超时了返回 如果是false 说明是因为超时返回的
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 线程等待 指定一个固定的等待截止时间deadline
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 唤醒一个等待的线程
*/
void signal();
/**
* 唤醒所有等待的线程
*/
void signalAll();
}
接口定义如上 看到Condition的接口定义 你是否想到了Object中的wait/notify,
那看下object
public class Object {
public final native void notify();
public final native void notifyAll();
public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException {
if (timeout < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (nanos < 0 || nanos > 999999) {
throw new IllegalArgumentException(
"nanosecond timeout value out of range");
}
if (nanos > 0) {
timeout++;
}
wait(timeout);
}
public final void wait() throws InterruptedException {
wait(0);
}
}
是不是很像
其实出现了await/signal就是为了补充wait/notify 监视器锁在同步代码块中的不足的功能,
大家可想一下 所有调用wait的线程,都会在同一个对象监视器锁中等待,notify唤醒的时候不能根据条件唤醒,可能有时候唤醒的根本不满足自己的条件 还是要使用wait继续阻塞,如果能在某一个条件下唤醒 那就OK了 于是Condition产生了!
Conditon 可以在同一个锁上 设置不同的条件 当满足一定的条件后 就唤醒等待这个条而阻塞的线程
objec方法 | Condition方法 | 区别 |
---|---|---|
void wait | void await() | |
void wait(long timeout) | long awaitNanos(long nanosTimeout) | 返回值 参数单位 |
wait(long timeout, int nanos) | boolean await(long time, TimeUnit unit) | 返回值 参数类型 |
void notify() | void signal() | |
void notifyAll() | void signalAll() | |
X | boolean awaitUntil(Date deadline) | Condition 有 |
X | void awaitUninterruptibly() | Condition 有 |
Sync-Queue和Conditian-Queue
下一篇见
AQS Condition的实现
条件队列-await源码分析
下一篇见
条件队列-signal源码分析
下一篇见
总结
下一篇见
网友评论