美文网首页并发和多线程
2020-02-02 2.2.2 条件锁 Condition

2020-02-02 2.2.2 条件锁 Condition

作者: FredWorks | 来源:发表于2020-02-05 17:49 被阅读0次

本文是Java线程安全和并发编程知识总结的一部分。

2.2.2 条件锁Conditioin

从实际上来看,条件锁本身其实并不是锁,而是从锁上获取到的一个工具包,这个工具包在底层通过条件线程队列,将一组线程和某个条件绑定起来,并提供了适当的时候挂起当前线程、唤醒(条件线程队列内匹配的)其他线程、或被其(条件线程队列内)他线程唤醒的方法。
Condition本身表现为一个接口,以及JDK的内置实现ConditionObject,并提供如下方法:

  • await(): 该方法将导致当前线程挂起,释放锁,直到被其他线程唤醒。
  • signal(): 该方法将唤醒等候当前条件的线程队列中的某个线程。
  • signalAll(): 该方法将唤醒等候当前条件的线程队列中的所有线程。
  • await的各种重载方法或类似方法:awaitUninterruptibly、awaitNanos、awaitUntil

条件对象和条件线程队列的关联是隐式的,当你使用Condition上的上述方法时,就启用了对应的条件线程队列,我们无需直接去创建、维护和使用条件线程队列。同时每个条件对象都对应了一个条件线程队列。实际上,JDK提供的两种条件对象的实现ConditionObject,本身就是两种条件队列实现类 AbstractQueuedSynchronizerAbstractQueuedLongSynchronizer的内部类。

实际上,正如每个对象背后都有一个内置锁一样,每个对象背后,都有一个内置条件线程队列。当你使用Object提供的如下方法时,你就激活了这个队列:

  • wait(): 挂起当前线程直到被唤醒。
  • notify(): 唤醒等候当前对象内置锁的内置条件队列中的某个线程。
  • notifyAll():唤醒等候当前对象内置锁的内置条件队列中的所有线程。

看看是不是和Condition的对应方法非常像?差别只在于内置条件线程队列是和对象的内置锁绑定的,也就是内置锁只有一个内置条件线程队列;而通过Condition访问的条件线程队列是和创建Condition的锁绑定给的,因此一个锁可以存着多个条件线程队列,只要通过。

内置锁、内置条件线程队列,他们已经被默认提供并在你使用synchronized关键词时被使用,非常的方便。但也失去了一些灵活性。比如,你不能将内置锁和任意条件关联。而通过Condition以及其背后的条件线程队列,则允许你自己控制锁到底和那个(或那几个)条件关联。

如何使用条件锁

条件对象和那个状态相关联,是由调用者决定的。下面通过一个例子来说明如何使用。假设我们要实现一个连接池的,获取连接的方法满足如下行为:

  1. 当池中有空闲连接时,获取线程方法立即返回并成功;
  2. 当池中无空闲连接且连接池未满,则获取连接的线程阻塞直到有空闲连接;
  3. 当池已满,且无空闲连接,则获取连接的线程阻塞直到超时或有空闲连接;
  4. 为了简化问题,其他方法就不提供实现了,示意即可;
/**
 * @author xx
 * 2020年2月5日 下午5:28:18 xx
 */
public class ConnectionPool<T> {

    private final Lock lock = new ReentrantLock();
    
    /**
     * 连接池的容量
     */
    private int capacity;
    
    /**
     * 获取连接的超时时长
     */
    private int fetchTimeOut;
    
    /**
     * 当前连接池的总大小
     */
    private int totalSize;
    
    /**
     * 代表连接池尚未满的条件(连接数未超过池大小,和状态属性 totalSize 和 capacity 有关)
     */
    private final Condition isFull = this.lock.newCondition();
    
    /**
     * 代表连接池未满但无空闲连接的条件。
     */
    private final Condition needIncr = this.lock.newCondition();
    
    /**
     * 空闲连接的容器
     */
    private LinkedList<T> freeConnections;
    
    /**
     * 正在使用的连接的容器
     */
    private LinkedList<T> busyConnections;
    
    /**
     * 
     * 构造函数
     * @param capacity 连接池大小
     * @param fetchTimeOut 连接池获取连接超时时长(s)
     */
    public ConnectionPool(int capacity, int fetchTimeOut) {
        this.capacity = capacity;
        this.fetchTimeOut = fetchTimeOut;
        this.totalSize = 0;
        this.freeConnections = new LinkedList<T>();
        this.busyConnections = new LinkedList<T>();
    }
    
    /**
     * 缓存的连接低于1个时,自动增加空闲连接。
     * 2020年2月5日 下午6:40:36 xx添加此方法
     */
    public void incNewConnections() {
        this.lock.lock();
        try {
            // 如果空闲连接数多余1个,则当前线程休眠,并放入 needIncr 条件对应的条件线程队列。
            while (this.freeConnections.size()  > 1) {
                // 该语句将导致当前线程休眠,并被放入 needIncr 条件对应的条件线程队列
                this.needIncr.await();
            }

            // 新缓存一批空闲连接
            this.createConnections();
            
            // 该语句会唤醒 needIncr 条件对应的条件线程队列中所有线程(获取连接的线程),方便通知获取连接的线程可以获取连接了
            this.needIncr.notifyAll();
        } catch (InterruptedException e) {
            throw new RuntimeException("创建连接时线程中断异常", e);
        } finally {
            this.lock.unlock();
        }
    }
    
    /**
     * 从连接池获取一个连接
     * 2020年2月5日 下午5:28:18 xx添加此方法
     */
    public T fetchConnectioin(int index) {
        this.lock.lock();
        try {
            // 如果连接池满了,则当前线程休眠,等到有人释放连接或超时
            while (this.totalSize == this.capacity) {
                // 该语句将导致当前线程休眠并被放入 isFull 条件对应的条件线程队列
                this.isFull.await(this.fetchTimeOut, TimeUnit.SECONDS);
            }
            
            // 如果没有空闲连接,则当前线程挂起等待连接初始化完成
            if (this.freeConnections.isEmpty()) {
                // 该语句将导致当前线程休眠并被放入 needIncr 条件对应的条件线程队列,等待连接创建线程完成工作后唤醒。
                this.needIncr.await();
            }
            
            T element = this.freeConnections.remove(this.freeConnections.size() - 1);
            this.busyConnections.add(element);
            
            return element;
        } catch (InterruptedException e) {
            throw new RuntimeException("获取连接时发生线程中断", e);
        } finally {
            this.lock.unlock();
        }
    }
    
    /**
     * 创建一个物理连接。
     * 本方法无需线程安全保护,因为它会被其他线程安全的方法调用。
     * 2020年2月5日 下午5:28:18 xx添加此方法
     * @param batchSize 一批次大小
     * @return
     */
    private void createConnections() {
        // 如果空闲连接数少于等于1个,则新创建最多10个连接并缓存起来
        int batchSize = this.capacity - this.totalSize;
        if (batchSize > 10) {
            batchSize =  10;
        }

        // 这里模拟创建连接的逻辑
        List<T> ts = new ArrayList<>(10);
        
        this.freeConnections.addAll(ts);
        this.totalSize += batchSize;
    }

    /**
     * 将连接释放会连接池
     * 2020年2月5日 下午5:28:18 xx添加此方法
     */
    public void releaseConnection(T connection) {
        this.lock.lock();
        try {
            if (this.busyConnections.remove(connection)) {
                this.freeConnections.add(connection);
                
                // 该语句将唤醒 isFull 条件对应条件线程队列中的所有线程(都是在等待空闲连接的线程)
                this.isFull.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
    }
}

温馨提示:
本连接池例子只是为了说明如何使用条件锁,真实的连接池绝非如此简单,请勿模仿。
这个例子想要说明如下几点:

  1. 条件是通过锁创建的。
  • 锁创建条件,那个锁创建的条件,就和那个锁绑定。
  • 一个锁可以创建多个条件。
  • 内置锁有且只有一个对应的内置条件线程队列。
  1. 条件锁中的条件(本例中的 isFull 属性),实际上是代表一种状态的抽象概念;至于这种状态具体是什么,有什么意义,都由调用者的逻辑决定;如果调用者的逻辑使用不当,JDK的条件锁本身,并不能分辨,也不能控制。
  • 本例中的 isFull 条件,实际上是和 this.totalSize == this.capacity 这个场景关联的。
  • 本例中的 needIncr 条件,实际上是和 this.freeConnections.size() <= 1 这个场景对应的。
  • 如果两个条件使用的场景发生错乱或遗留,JDK无法发现,将可能导致线程泄露、死锁或其他锁使用不当问题。
  1. 条件本身,实际上只是用来和背后的条件线程队列沟通的工具包;每个条件都有对应的一个条件线程队列。
  • 每个条件都有自己的条件线程队列;
  • 通过await,将当前线程休眠,放到和条件对应的条件线程队列中,并释放锁;但它并不能影响到其他条件对应的条件线程队列。
  • 通过 signalAll/signal,唤醒条件对应的条件线程队列中的所有线程,让它们去竞争锁;但它并不能影响到其他条件对应的条件线程队列。

相关文章

网友评论

    本文标题:2020-02-02 2.2.2 条件锁 Condition

    本文链接:https://www.haomeiwen.com/subject/rsnfxhtx.html