美文网首页并发和多线程
2020-02-02 2.2.2 条件锁 Condition

2020-02-02 2.2.2 条件锁 Condition

作者: FredWorks | 来源:发表于2020-02-05 17:49 被阅读0次

    本文是Java线程安全和并发编程知识总结的一部分。

    2.2.2 条件锁Conditioin

    从实际上来看,条件锁本身其实并不是锁,而是从锁上获取到的一个工具包,这个工具包在底层通过条件线程队列,将一组线程和某个条件绑定起来,并提供了适当的时候挂起当前线程、唤醒(条件线程队列内匹配的)其他线程、或被其(条件线程队列内)他线程唤醒的方法。
    Condition本身表现为一个接口,以及JDK的内置实现ConditionObject,并提供如下方法:

    • await(): 该方法将导致当前线程挂起,释放锁,直到被其他线程唤醒。
    • signal(): 该方法将唤醒等候当前条件的线程队列中的某个线程。
    • signalAll(): 该方法将唤醒等候当前条件的线程队列中的所有线程。
    • await的各种重载方法或类似方法:awaitUninterruptibly、awaitNanos、awaitUntil

    条件对象和条件线程队列的关联是隐式的,当你使用Condition上的上述方法时,就启用了对应的条件线程队列,我们无需直接去创建、维护和使用条件线程队列。同时每个条件对象都对应了一个条件线程队列。实际上,JDK提供的两种条件对象的实现ConditionObject,本身就是两种条件队列实现类 AbstractQueuedSynchronizerAbstractQueuedLongSynchronizer的内部类。

    实际上,正如每个对象背后都有一个内置锁一样,每个对象背后,都有一个内置条件线程队列。当你使用Object提供的如下方法时,你就激活了这个队列:

    • wait(): 挂起当前线程直到被唤醒。
    • notify(): 唤醒等候当前对象内置锁的内置条件队列中的某个线程。
    • notifyAll():唤醒等候当前对象内置锁的内置条件队列中的所有线程。

    看看是不是和Condition的对应方法非常像?差别只在于内置条件线程队列是和对象的内置锁绑定的,也就是内置锁只有一个内置条件线程队列;而通过Condition访问的条件线程队列是和创建Condition的锁绑定给的,因此一个锁可以存着多个条件线程队列,只要通过。

    内置锁、内置条件线程队列,他们已经被默认提供并在你使用synchronized关键词时被使用,非常的方便。但也失去了一些灵活性。比如,你不能将内置锁和任意条件关联。而通过Condition以及其背后的条件线程队列,则允许你自己控制锁到底和那个(或那几个)条件关联。

    如何使用条件锁

    条件对象和那个状态相关联,是由调用者决定的。下面通过一个例子来说明如何使用。假设我们要实现一个连接池的,获取连接的方法满足如下行为:

    1. 当池中有空闲连接时,获取线程方法立即返回并成功;
    2. 当池中无空闲连接且连接池未满,则获取连接的线程阻塞直到有空闲连接;
    3. 当池已满,且无空闲连接,则获取连接的线程阻塞直到超时或有空闲连接;
    4. 为了简化问题,其他方法就不提供实现了,示意即可;
    /**
     * @author xx
     * 2020年2月5日 下午5:28:18 xx
     */
    public class ConnectionPool<T> {
    
        private final Lock lock = new ReentrantLock();
        
        /**
         * 连接池的容量
         */
        private int capacity;
        
        /**
         * 获取连接的超时时长
         */
        private int fetchTimeOut;
        
        /**
         * 当前连接池的总大小
         */
        private int totalSize;
        
        /**
         * 代表连接池尚未满的条件(连接数未超过池大小,和状态属性 totalSize 和 capacity 有关)
         */
        private final Condition isFull = this.lock.newCondition();
        
        /**
         * 代表连接池未满但无空闲连接的条件。
         */
        private final Condition needIncr = this.lock.newCondition();
        
        /**
         * 空闲连接的容器
         */
        private LinkedList<T> freeConnections;
        
        /**
         * 正在使用的连接的容器
         */
        private LinkedList<T> busyConnections;
        
        /**
         * 
         * 构造函数
         * @param capacity 连接池大小
         * @param fetchTimeOut 连接池获取连接超时时长(s)
         */
        public ConnectionPool(int capacity, int fetchTimeOut) {
            this.capacity = capacity;
            this.fetchTimeOut = fetchTimeOut;
            this.totalSize = 0;
            this.freeConnections = new LinkedList<T>();
            this.busyConnections = new LinkedList<T>();
        }
        
        /**
         * 缓存的连接低于1个时,自动增加空闲连接。
         * 2020年2月5日 下午6:40:36 xx添加此方法
         */
        public void incNewConnections() {
            this.lock.lock();
            try {
                // 如果空闲连接数多余1个,则当前线程休眠,并放入 needIncr 条件对应的条件线程队列。
                while (this.freeConnections.size()  > 1) {
                    // 该语句将导致当前线程休眠,并被放入 needIncr 条件对应的条件线程队列
                    this.needIncr.await();
                }
    
                // 新缓存一批空闲连接
                this.createConnections();
                
                // 该语句会唤醒 needIncr 条件对应的条件线程队列中所有线程(获取连接的线程),方便通知获取连接的线程可以获取连接了
                this.needIncr.notifyAll();
            } catch (InterruptedException e) {
                throw new RuntimeException("创建连接时线程中断异常", e);
            } finally {
                this.lock.unlock();
            }
        }
        
        /**
         * 从连接池获取一个连接
         * 2020年2月5日 下午5:28:18 xx添加此方法
         */
        public T fetchConnectioin(int index) {
            this.lock.lock();
            try {
                // 如果连接池满了,则当前线程休眠,等到有人释放连接或超时
                while (this.totalSize == this.capacity) {
                    // 该语句将导致当前线程休眠并被放入 isFull 条件对应的条件线程队列
                    this.isFull.await(this.fetchTimeOut, TimeUnit.SECONDS);
                }
                
                // 如果没有空闲连接,则当前线程挂起等待连接初始化完成
                if (this.freeConnections.isEmpty()) {
                    // 该语句将导致当前线程休眠并被放入 needIncr 条件对应的条件线程队列,等待连接创建线程完成工作后唤醒。
                    this.needIncr.await();
                }
                
                T element = this.freeConnections.remove(this.freeConnections.size() - 1);
                this.busyConnections.add(element);
                
                return element;
            } catch (InterruptedException e) {
                throw new RuntimeException("获取连接时发生线程中断", e);
            } finally {
                this.lock.unlock();
            }
        }
        
        /**
         * 创建一个物理连接。
         * 本方法无需线程安全保护,因为它会被其他线程安全的方法调用。
         * 2020年2月5日 下午5:28:18 xx添加此方法
         * @param batchSize 一批次大小
         * @return
         */
        private void createConnections() {
            // 如果空闲连接数少于等于1个,则新创建最多10个连接并缓存起来
            int batchSize = this.capacity - this.totalSize;
            if (batchSize > 10) {
                batchSize =  10;
            }
    
            // 这里模拟创建连接的逻辑
            List<T> ts = new ArrayList<>(10);
            
            this.freeConnections.addAll(ts);
            this.totalSize += batchSize;
        }
    
        /**
         * 将连接释放会连接池
         * 2020年2月5日 下午5:28:18 xx添加此方法
         */
        public void releaseConnection(T connection) {
            this.lock.lock();
            try {
                if (this.busyConnections.remove(connection)) {
                    this.freeConnections.add(connection);
                    
                    // 该语句将唤醒 isFull 条件对应条件线程队列中的所有线程(都是在等待空闲连接的线程)
                    this.isFull.signalAll();
                }
            } finally {
                this.lock.unlock();
            }
        }
    }
    

    温馨提示:
    本连接池例子只是为了说明如何使用条件锁,真实的连接池绝非如此简单,请勿模仿。
    这个例子想要说明如下几点:

    1. 条件是通过锁创建的。
    • 锁创建条件,那个锁创建的条件,就和那个锁绑定。
    • 一个锁可以创建多个条件。
    • 内置锁有且只有一个对应的内置条件线程队列。
    1. 条件锁中的条件(本例中的 isFull 属性),实际上是代表一种状态的抽象概念;至于这种状态具体是什么,有什么意义,都由调用者的逻辑决定;如果调用者的逻辑使用不当,JDK的条件锁本身,并不能分辨,也不能控制。
    • 本例中的 isFull 条件,实际上是和 this.totalSize == this.capacity 这个场景关联的。
    • 本例中的 needIncr 条件,实际上是和 this.freeConnections.size() <= 1 这个场景对应的。
    • 如果两个条件使用的场景发生错乱或遗留,JDK无法发现,将可能导致线程泄露、死锁或其他锁使用不当问题。
    1. 条件本身,实际上只是用来和背后的条件线程队列沟通的工具包;每个条件都有对应的一个条件线程队列。
    • 每个条件都有自己的条件线程队列;
    • 通过await,将当前线程休眠,放到和条件对应的条件线程队列中,并释放锁;但它并不能影响到其他条件对应的条件线程队列。
    • 通过 signalAll/signal,唤醒条件对应的条件线程队列中的所有线程,让它们去竞争锁;但它并不能影响到其他条件对应的条件线程队列。

    相关文章

      网友评论

        本文标题:2020-02-02 2.2.2 条件锁 Condition

        本文链接:https://www.haomeiwen.com/subject/rsnfxhtx.html