美文网首页
带你看看Java-AQS同步器 源码解读<四>条件队列Condi

带你看看Java-AQS同步器 源码解读<四>条件队列Condi

作者: burgxun | 来源:发表于2020-04-01 11:08 被阅读0次
  1. Java-AQS同步器 源码解读<一>独占锁加锁
  2. Java-AQS同步器 源码解读<二>独占锁解锁
  3. Java-AQS同步器 源码解读<三>共享锁
  4. Java-AQS同步器 源码解读<四>-条件队列上
  5. Java-AQS同步器 源码解读<五>-条件队列下

前文

AQS的源码分析已经写了篇了,
第一篇 我大概描述了下同步队列的独占模式的加锁,
第二篇我带大家分析了下独占模式的解锁
第三篇和大家分析了同步队列的共享模式,有的小伙伴反应前面的3篇文章写的有点儿干,可能我没注意自己的写作方法和方式,今天这篇我就改变下风格。

好的,废话不多说,我们进入正文,今天给大家看的是条件队列,是AQS中另外的一个队列-Condition,大家应该还记得之前我们说过node类,node类可以构成Sync Queue 和Condition Queue 第一篇文章说过,由于条件队列和前面比起来 复杂一点 我会分上下2篇文章聊

  • 上篇 我会带大家了解下为什么要有Condition,和Condition在JDK中的使用,最后和大家聊下Lock和Condition这2个接口,最后简单说下Object监视器锁和Condition
  • 下篇 我会和大家进入源码分析下AQS中 Condition的具体实现 最后做个全文总结

为什么需要条件队列Conditon Queue

举个小例子

之前看很多文章或者看到很多博客描述AQS的时候,很多小伙伴都不清楚为什么要存再一个Condition Queue 明明Sync Queue 就可以了呀,
那我就举一个小例子 大家带入一下 看看条件队列存在的意义:

话说小明中午放学去食堂排队吃饭,但是小明就喜欢吃红烧肉,如果没红烧肉就吃不下去饭,这个时候小明排队排到了窗口,点菜的时候发现没有红烧肉了,咋办呢 小明就在那边干等着,但是食堂打饭的大叔不乐意了

  • 小明:大叔 我要一份红烧肉加一碗米饭
  • 食堂大叔:哎呀,同学对不住啦,今天红烧肉太好卖了,卖完了,要不您换个菜
  • 小明:不行不行,我一顿不吃红烧肉就吃不下饭
  • 食堂大叔:那好的 你先站旁边 单独排个对,让后面的同学先来
  • 小明:好的 可以的 那红烧肉有了 你通知下我哈 我重新去排队
  • 食堂大叔:嗯好的 好的可以的 等有了 我叫你一下
  • 过了1分钟。。。
  • 小强:大叔今天红烧肉 没有了么,我没看到呀
  • 食堂大叔:小伙子 你先站旁边 站在小明后面排队,红烧肉一会儿就上
  • 小强:跑过去 排在了小明的后面
  • 过了2分钟。。。
  • 食堂大叔:哎 小伙子们 红烧肉来了
  • 小明,小强:好的 马上去吃饭的队伍里面去排队
  • 排了1分钟后。。。。
  • 小明和小强总于点到了心爱的红烧肉

不知道 大家看了上面的例子 你有什么感触,食堂排队吃饭 就相当于一个Sync Queue 一样,小明和小强排队等红烧肉就是一个Conditin Queue,如果没有这个条件队列 那小明和小强后面排队的人都要在那边干等着,有了条件队列以后整个排队吃饭的程序就没问题了,当然条件队列可能有很多,就像有的人喜欢吃红烧肉,有的人喜欢吃大煮干丝一样,如果没有就自己重写在旁边去排队,等待唤醒。

分析

从上面的例子中,我们应该可以知道条件队列中的node 一开始是要去SyncQueue中去争取锁的,但是获取到锁以后 发现条件不满足,就会释放掉已获取到锁转儿进入条件队列等待,再得到Signal信号后唤醒,又进入了Sync Queue 中去争取锁,获取到以后 最终执行后面的方法,在有些场景下单纯的Sync Queue 是不能解决问题的 这个时候就需要Condition Queue 去配合使用

怎么使用条件队列

写个小Demo

package org.example;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName ConditionDemo
 * @Auther burgxun
 * @Description: 一个使用AQS条件队列的Demo
 * @Date 2020/3/31 21:43
 **/
public class ConditionDemo {

    final ReentrantLock lock;//重入锁

    private final Condition notEmpty;//一个非空的Condition

    private final Condition notFull;//一个非满的Condition

    final String[] stringItems;//存储元素的集合

    int count;//数组当前的容量
    int putIndex;//取的时候下标
    int getIndex;//获取的是下标

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss SSS");

    public ConditionDemo(int capacity) {
        if (capacity < 0) {
            throw new IllegalArgumentException();
        }
        this.stringItems = new String[capacity];
        this.lock = new ReentrantLock();
        this.notEmpty = this.lock.newCondition();
        this.notFull = this.lock.newCondition();
    }

    public void Put(String item) {
        this.lock.lock();
        try {
            while (this.count == stringItems.length) {//如果当前的数组已经满了 那就让线程等待 非满的条件
                System.out.println(String.format("NowDate:%s  %s:当前集合已满,需要等待数据取出",
                        simpleDateFormat.format(new Date()),
                        Thread.currentThread().getName()));
                notFull.await();
            }
            stringItems[putIndex] = item;
            System.out.println(String.format("NowDate:%s  %s:集合中新增一条数据- %s", simpleDateFormat.format(new Date()),
                    Thread.currentThread().getName(),
                    item));
            if (++putIndex == stringItems.length) {//如果已经是放到最后一位了 那就从头开始放 相当于一个循环数组
                putIndex = 0;
            }
            this.count++;
            notEmpty.signal();//因为这边已经新增加一个了 所以要唤醒等待非空的条件的线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            this.lock.unlock();
        }
    }

    public String get() throws InterruptedException {
        this.lock.lock();
        try {
            while (count == 0) {//如果获取元素的时候 发现当前集合中的元素个数为0了 那就等待一个非空的条件 线程进入循环
                System.out.println(String.format("NowDate:%s  %s:集合中已空,需要等待新增数据",
                        simpleDateFormat.format(new Date()),
                        Thread.currentThread().getName()));
                notEmpty.await();
            }
            String item = stringItems[getIndex];
            stringItems[getIndex] = null;
            System.out.println(String.format("NowDate:%s  %s:集合中取出一条数据- %s",
                    simpleDateFormat.format(new Date()),
                    Thread.currentThread().getName(),
                    item));
            if (++getIndex == stringItems.length) {//如果获取的时候 已经到了尾部,那就从头获取
                getIndex = 0;
            }
            count--;
            notFull.signal();
            return item;
        } finally {
            this.lock.unlock();
        }
    }


    public static void main(String[] args) {
        ConditionDemo conditionDemo = new ConditionDemo(3);
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 10, 10,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5));
        executorService.prestartCoreThread();
        try {
            /*设置5个生产者线程*/
            for (int i = 0; i < 3; i++) {
                executorService.submit(() -> {
                    for (int j = 0; j < 2; j++) {
                        try {
                            Thread.sleep(100);
                            conditionDemo.Put("Add index" + j);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            /*设置一个消费者线程*/
            executorService.submit(() -> {
                try {
                    for (int i = 0; i < 50; i++) {
                        Thread.sleep(100);
                        String item = conditionDemo.get();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

运行结果:


结果

看完了 这个小demo 你是否对Condition 有了点儿了解

JDK中是怎么使用的

下面 我们去看下 JDK里面 怎么去用这个Condition的

我们进入到 ArrayBlockingQueue的源码中看下:
首先我们看下默认的构造函数

  public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();//非空Condition
        notFull =  lock.newCondition();//非满Condition
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();//如果当前集合已经满了 那线程就进入大非满的Condition中等待
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();//因为填加了一个元素 所以要唤醒一个等待在非空的Condition线程
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();//这边也是一样的 如果当先集合没有元素了,那就线程进入非空的Condition 等待
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//因为集合已经取走一个元素了 那一定是非满了,那就唤醒在非满上等待的线程
        return x;
    }

相信大家看了我截取的部分ArrayBlockingQueue的部分源码 也能明白了很多,其实我上面的demo 就是按照这个写的~

Lock和Condition

Lock

那首先 我什么要聊lock 这个接口呢 我们看看代码再说

**
 * @ClassName Lock
 * @Auther burgxun
 * @Date 2020/3/28 19:43
 **/
public interface Lock {
    /**
     * 获取锁
     */
    void lock();

    /**
     * 获取锁 会抛出中断异常
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 非阻塞式的获取锁  尝试过去一次 成功就返回true 失败就返回false
     */
    boolean tryLock();

    /**
     * 尝试获取锁  带超时时间  可中断的  如果获取不到锁 线程会在阻塞队列中 线程休眠 不参与线程调度
     * 直到下面三种情况 才被唤醒
     * 第一种 情况是 获取到了锁
     * 第二种 情况是 其他线程中断了当前线程  这边抛出异常 或者在调用方法的开始 线程的中断标识已经设定了true  这个在子类的实现中 我们会看到
     * 第三种 情况是  设定的获取锁时间到了 自动退出争抢  线程唤醒
     *
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 释放锁
     */
    void unlock();

    /**
     * 返回一个Condition对象
     */
    Condition newCondition();
}
header 1 header 2
lock 阻塞
lockInterruptibly 阻塞 可中断
tryLock 非阻塞
tryLock(long time, TimeUnit unit) 超时机制 可中断

上面是我对Lock 接口各个功能的描述,我们看最后一个方法的时候,发现Lock有一个newCondition的方法 而且返回了一个Condition对象 说明我们的Condition是和lock关联的,而且是lock对象创建了Condition,从我们之前看的ArrayBlockingQueue中也能看到这个关系

那我去源码中去找下是怎么实现lock的newCondition方法的
下面是我提取了ReentrantLock的几个方法 如下:

/**
 * @ClassName ReentrantLock
 * @Auther burgxun
 * @Description:
 * @Date 2020/3/28 19:44
 **/
public class ReentrantLock implements Lock {

    private final Sync sync;


    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();

        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }

    //非公平锁
    static final class NonfairSync extends Sync {

        @Override
        void lock() {

        }
    }

    //公平锁
    static final class FairSync extends Sync {

        @Override
        void lock() {
            //acquire(1);
        }
    }

    //默认实现非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

从代码中 我们可以看到 最终创建的对象还是AQS中的ConditionObject对象

Condition

Condition接口:

/**
 * @ClassName Condition
 * @Auther burgxun
 * @Date 2020/3/28 18:10
 **/
public interface Condition {
    /**
     * 线程等待 进入条件队列
     */
    void await() throws InterruptedException;

    /**
     * 线程等待 不会抛出中断异常 只会记录中断标记 做出线程中断
     */
    void awaitUninterruptibly();

    /**
     * 线程等待  设置等待的时间 单位是毫秒  这个方法和await(long time, TimeUnit unit)方法比起来  此方法看上去 比较鸡肋
     * 但是需要注意的是 2个方法放回值 是不同的 这个方法返回的是剩余等待的时间
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 线程进入阻塞等待  3中情况 会唤醒 一是获取到 signalled的信号,二是线程发生了中断,三是等待的唤醒的时间已经到了
     * 如果返回是true 则说明是超时了返回   如果是false 说明是因为超时返回的
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 线程等待  指定一个固定的等待截止时间deadline  
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 唤醒一个等待的线程
     */
    void signal();

    /**
     * 唤醒所有等待的线程
     */
    void signalAll();

}

接口定义如上 看到Condition的接口定义 你是否想到了Object中的wait/notify,
那看下object

public class Object {
    public final native void notify();
    
    public final native void notifyAll();
    
    public final native void wait(long timeout) throws InterruptedException;
    
    public final void wait(long timeout, int nanos) throws InterruptedException {
        if (timeout < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (nanos < 0 || nanos > 999999) {
            throw new IllegalArgumentException(
                    "nanosecond timeout value out of range");
        }

        if (nanos > 0) {
            timeout++;
        }

        wait(timeout);
    }
    
    public final void wait() throws InterruptedException {
        wait(0);
    }
}

是不是很像
其实出现了await/signal就是为了补充wait/notify 监视器锁在同步代码块中的不足的功能,
大家可想一下 所有调用wait的线程,都会在同一个对象监视器锁中等待,notify唤醒的时候不能根据条件唤醒,可能有时候唤醒的根本不满足自己的条件 还是要使用wait继续阻塞,如果能在某一个条件下唤醒 那就OK了 于是Condition产生了!
Conditon 可以在同一个锁上 设置不同的条件 当满足一定的条件后 就唤醒等待这个条而阻塞的线程

objec方法 Condition方法 区别
void wait void await()
void wait(long timeout) long awaitNanos(long nanosTimeout) 返回值 参数单位
wait(long timeout, int nanos) boolean await(long time, TimeUnit unit) 返回值 参数类型
void notify() void signal()
void notifyAll() void signalAll()
X boolean awaitUntil(Date deadline) Condition 有
X void awaitUninterruptibly() Condition 有

Sync-Queue和Conditian-Queue

下一篇见

AQS Condition的实现

条件队列-await源码分析

下一篇见

条件队列-signal源码分析

下一篇见

总结

下一篇见

相关文章

网友评论

      本文标题:带你看看Java-AQS同步器 源码解读<四>条件队列Condi

      本文链接:https://www.haomeiwen.com/subject/oryouhtx.html