美文网首页一些收藏JUC 并发专题面试精选
Java并发编程——ReentrantLock—Conditio

Java并发编程——ReentrantLock—Conditio

作者: 小波同学 | 来源:发表于2021-12-07 15:07 被阅读0次

    一、前言

    Jdk中独占锁的实现除了使用关键字synchronized外,还可以使用ReentrantLock。虽然在性能上ReentrantLock和synchronized没有什么区别,但ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。

    使用synchronized结合Object上的wait和notify方法可以实现线程间的等待通知机制。Condition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。

    与wait/notify区别

    • 1.Condition能够支持不响应中断,而通过使用Object方式不支持。

    • 2.Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个。

    • 3.Condition能够支持超时时间的设置,而Object不支持。

    二、Condition实现生产者和消费者模式

    为了方便理解,我们先写一个用condition实现的生产者消费者的例子。

    /**
     * @Description: 演示Condition实现生产者和消费者模式
     */
    public class ConditionDemo2 {
    
        private int queueSize = 10;
        private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    
        private ReentrantLock lock = new ReentrantLock();
        private Condition notFull = lock.newCondition();
        private Condition notEmpty = lock.newCondition();
    
        public static void main(String[] args) {
            ConditionDemo2 conditionDemo2 = new ConditionDemo2();
            Producer producer = conditionDemo2.new Producer();
            Consumer consumer = conditionDemo2.new Consumer();
    
            new Thread(producer).start();
            new Thread(consumer).start();
        }
    
        class Consumer implements Runnable{
            @Override
            public void run() {
                consume();
            }
    
            public void consume(){
                while(true){
                    lock.lock();
                    try{
                        while (queue.size() == 0){
                            System.out.println("队列空,等待数据");
                            notEmpty.await();
                        }
                        Integer poll = queue.poll();//走过await()证明队列不为空,取出数据
                        System.out.println("消费者消费数据:"+poll+",队列剩余数据数量:"+queue.size());
    
                        notFull.signalAll();//获取数据之后,队列肯定有空闲,那么唤醒生产者进行生产
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    
        class Producer implements Runnable{
            @Override
            public void run() {
                produce();
            }
    
            public void produce(){
                while(true){
                    lock.lock();
                    try{
                        while (queue.size() == queueSize){
                            System.out.println("队列已满,等待空余");
                            notFull.await();
                        }
                        queue.offer(1);//走过await()证明队列有空闲,开始往队列里生产数据
                        System.out.println("生产者向队列生产一个数据,队列剩余空间:"+(queueSize-queue.size()));
    
                        notEmpty.signalAll();//向队列生产数据之后,队列不为空,那么唤醒消费者进行消费
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    }
    

    Condition注意点

    • 实际上,如果说Lock用来替代synchronized,那么Condition就是用来代替相对应的Object.wait/notify的,所以在用法和性质上,几乎一样。
    • await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁。
    • 调用await的时候,必须持有锁,否则会抛异常,和Object.wait一样。

    三、原理分析

    在AQS中存在两个FIFO队列:同步队列(等待队列)和条件队列

    同步队列(等待队列):ReentrantLock实现原理

    本文主要是讲condition实现原理(即条件队列),条件队列是由Condition内部实现的,是一个虚拟的FIFO单向队列,在AQS中同步队列、等待队列组成关系:

    • 1、AQS中tail 和 head主要构成了一个FIFO双向的同步队列。

    • 2、AQS中condition构成了一个FIFO单向条件队列。condition是AQS内部类,每个Condition对象中保存了firstWaiter和lastWaiter作为队列首节点和尾节点,每个节点使用Node.nextWaiter保存下一个节点的引用,因此等待队列是一个单向队列。

    3.1 队列关系

    在Object的监视器(monitor)模型上,一个对象拥有一个同步队列和一个等待队列;而并发包中的AQS上拥有一个同步队列和多个等待队列。两者的具体实现原理的有所不同,但在多线程下等待/唤醒 操作的思路有相同之处,Object的监视器模型 和 AQS对同步队列、等待队列对应关系如下图

    3.1.1、Object的监视器模型同步、等待队列对应关系图

    3.1.2、AQS中同步队列、条件队列对应关系图

    当多线程并发访问AQS的lock()、await()、single()方法时,同步队列和等待队列变化处理过程包括:

    • 1、多个线程执行lock()方法时,线程会竞争获取同步锁state,获取成功的线程占有锁state、获取失败的线程会封装成node加入到AQS的同步队列中,等待锁state的释放。
    • 2、等获取了state锁的线程(同步队列中head节点)执行await()方法时,condition会将当前线程封装成一个新的node添加到condition等待队列的尾部,同时阻塞(waiting),直到被唤醒。
    • 3、等获取了state锁的线程(同步队列中head节点)single()方法时,condition会将等待队列首节点移动到同步队列的尾部,直到获取同步锁state才被唤醒。

    3.2 Condition的实现

    3.2.1 等待的实现

    当线程调用Condition.await()方法时,将会把前线程封装成node节点,并将节点加入等待队列的尾部,然后释放同步state状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当前线程加入Condition的等待队列逻辑如下图:


    • 1、能够调用Condition.await()方法的节点是获取了同步state锁的node,即同步队列中的head节点;调用Condition的await()方法(或者以await开头的方法)会使当前线程进入等待队列并释放锁、唤醒同步队列中的后继节点,最后线程状态变为等待状态。

    • 2、Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。

    • 3、调用Condition.await()节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了state锁的线程,也就是说该过程是由锁来保证线程安全的。

    3.2.2 通知的实现

    整个signal()的过程可以总结如下:

    • 1、执行signal()唤醒线程时,先判断当前线程是否是同步锁state持有线程,所以能够调用signal()方法的线程一定持有了同步锁state。

    • 2、自旋唤醒等待队列的firstWaiter(首节点),在唤醒firstWaiter节点之前,会将等待队列首节点移到同步队列中。

    四、源码分析

    可以看到,想要获得一个condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。

    Condition condition = lock.newCondition();
    
    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
        
        abstract static class Sync extends AbstractQueuedSynchronizer {
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
        }
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            /** First node of condition queue. */
            private transient Node firstWaiter;
            
            /** Last node of condition queue. */
            private transient Node lastWaiter;
    
            /**
             * Creates a new {@code ConditionObject} instance.
             */
            public ConditionObject() { }
        }
    }       
    

    condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的条件队列,所有调用condition.await方法的线程会加入到条件队列中,并且线程状态转换为等待状态。

    ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,条件队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点。

    用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列。

    4.1 await等待

    只有线程获取到lock之后,才可以使用condition的await方法。假设此时线程1获取到了ReentrantLock锁,在执行代码逻辑的时候,发现某些条件不符合,于是调用了condition.await();代码:

    此时AQS主要执行以下动作:

    • 线程1把自己包装成节点,waitStatus设为CONDITION(-2),追加到ConditionObject中的条件队列(每个ConditionObject有一个自己的条件队列);
    • 线程1释放锁,把state设置为0;
    • 然后唤醒等待队列中head节点的下一个节点;
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            public final void await() throws InterruptedException {
                //如果当前线程中断,抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾   
                Node node = addConditionWaiter();
                // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    // 3. 当前线程进入到等待状态
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 4. 自旋等待获取到同步状态(即获取到lock)
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    //删除无效的等待节点
                    unlinkCancelledWaiters();
                // 5. 处理被中断的情况  
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
        }
    }
    

    保存新节点addConditionWaiter()方法如下。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // 清除被取消的尾节点
                if (t != null && t.waitStatus != Node.CONDITION) {
                    //解除关联
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                //将当前线程保存在Node中
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    //队尾插入
                    t.nextWaiter = node;
                //更新lastWaiter  
                lastWaiter = node;
                return node;
            }
        }
    }
    

    将当前节点保存到新建立的Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。可以看出等待队列是一个不带头结点的链式队列,而AQS中的同步队列是一个带头结点的链式队列。

    将当前节点插入到等待对列之后,会调用fullyRelease,使当前线程释放lock。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    //成功释放同步状态
                    failed = false;
                    return savedState;
                } else {
                    //不成功释放同步状态抛出异常
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    }
    

    方法内部调用AQS的模板方法release方法释放AQS的同步状态,并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            public final void await() throws InterruptedException {
                //......
                
                while (!isOnSyncQueue(node)) {
                    // 3. 当前线程进入到等待状态
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                
                //......
            }
        }
    }
    

    当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,有两种可能:

      1. 逻辑走到break退出while循环(当前等待的线程被中断)
      1. while循环中的逻辑判断为false(当前节点被移动到了同步队列中,即另外线程调用的condition的signal或者signalAll方法)。

    总的说就是当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后 ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用acquireQueued(node, savedState)(之前Reentlock中讲过),自旋过程中线程不断尝试获取同步状态,直至获取lock成功。这也说明了退出await方法必须是已经获得了condition关联的lock。

    4.2 signal唤醒

    当另一个线程执行了 condition.signal之后,主要是做了以下事情:

    • 1、把条件队列中的第一个节点追加到等待队列中;
    • 2、把等待队列原来尾节点的waitStatus设置为SIGNAL。

    然后继续处理自己的事情,自己的事情处理完成之后,会释放锁,唤醒等待队列中head节点的下一个节点线程进行工作。

    调用condition的signal唤醒一个等待在condition上的线程(头节点),将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回,源码如下。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            public final void signal() {
                //1. 先检测当前线程是否已经获取lock
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
    
                //2. 获取条件队列中第一个节点,之后的操作都是针对这个节点 
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
        }
    }
    

    signal方法首先会检测当前线程是否已经获取lock,没有获取lock会直接抛出异常,再调用doSignal传入头节点。doSignal方法源码为:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            private void doSignal(Node first) {
                do {
                    // 已经是尾节点了
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    // 将头结点从条件队列中移除 
                    first.nextWaiter = null;
                // while中transferForSignal方法对头结点做真正的处理
                // 将等待队列中的 Node 转移至 AQS 同步队列, 不成功且还有节点则继续循环
                } while (!transferForSignal(first) &&
                        // 队列还有节点
                         (first = firstWaiter) != null);
            }
        }
    }
    

    具体逻辑请看注释,真正对头节点做处理的逻辑在transferForSignal,该方法源码为:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        // 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
        final boolean transferForSignal(Node node) {
            // 更新状态为0
            // 如果状态已经不是 Node.CONDITION, 说明被取消了
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            //将该节点移入到AQS同步队列尾部
            Node p = enq(node);
            int ws = p.waitStatus;
            // 上一个节点被取消
            // 上一个节点不能设置状态为 Node.SIGNAL
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                // unpark 取消阻塞, 让线程重新同步状态
                LockSupport.unpark(node.thread);
            return true;
        }
    }
    

    4.3 signalAll()源码

    signalAll()会从首节点循环遍历条件队列,将条件队列中的所有节点移到同步队列中去。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            public final void signalAll() {
                //1. 先检测当前线程是否已经获取lock
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }   
    
            //遍历条件队列,将条件队列中的node移动到同步队列中
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    //移动节点到同步队列中
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }       
        }
    }
    

    4.4 不可打断等待 - 直到被唤醒

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            public final void awaitUninterruptibly() {
                // 添加一个 Node 至等待队列
                Node node = addConditionWaiter();
                // 释放节点持有的锁
                int savedState = fullyRelease(node);
                boolean interrupted = false;
                // 如果该节点还没有转移至 AQS 队列, 阻塞
                while (!isOnSyncQueue(node)) {
                    // park 阻塞
                    LockSupport.park(this);
                    // 如果被打断, 仅设置打断状态
                    if (Thread.interrupted())
                        interrupted = true;
                }
                // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
                if (acquireQueued(node, savedState) || interrupted)
                    selfInterrupt();
            }   
        }
    }
    

    4.5 等待 - 直到被唤醒或打断或超时

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 添加一个 Node 至等待队列, 
                Node node = addConditionWaiter();
                // 释放节点持有的锁
                int savedState = fullyRelease(node);
                // 获得最后期限
                final long deadline = System.nanoTime() + nanosTimeout;
                int interruptMode = 0;
                // 如果该节点还没有转移至 AQS 队列, 阻塞
                while (!isOnSyncQueue(node)) {
                    // 已超时, 退出等待队列
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);
                        break;
                    }
                    // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                        
                    // 如果被打断, 退出等待队列    
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                // 退出等待队列后, 还需要获得 AQS 队列的锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 所有已取消的 Node 从队列链表删除  
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                // 应用打断模式   
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();
            }
        }
    }
    

    4.6 await恢复后继续执行

    被唤醒的如果是之前执行了await方法的线程,那么该线程会接着就像往await方法里面阻塞处的下面继续执行,下面是源码:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public class ConditionObject implements Condition, java.io.Serializable {
    
            public final void await() throws InterruptedException {
                //如果当前线程中断,抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾   
                Node node = addConditionWaiter();
                // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    // 3. 当前线程进入到等待状态
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 4. 自旋等待获取到同步状态(即获取到lock)
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    //删除无效的等待节点
                    unlinkCancelledWaiters();
                // 5. 处理被中断的情况  
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
        }
    }
    

    可以发现,这里主要是判断到当前线程节点已经放入等待队列了,那么会尝试获取锁,获取成功则继续往下执行代码。

    只有线程获取到ReentrantLock的锁之后才可以继续往下执行,中间可能会因为执行await而进入条件队列并释放锁,最后又会被唤醒重新获取锁,继续往下执行。最后按照书写规范,我们一定会在代码中执行ReentrantLock.unlock()释放锁,然后继续唤醒等待队列后续线程继续执行。

    总结

    • 1、Condition等待通知的本质就是条件队列 和 同步队列的交互的过程,跟object的wait()/notify()机制一样;Condition是基于同步锁state实现的,而objec是基于monitor模式实现的。

    • 2、一个lock(AQS)可以有多个Condition,即多个条件队列,只有一个同步队列。

    • 3、Condition.await()方法执行时,会将同步队列里的head锁释放掉,把线程封装成新node添加到条件队列中;Condition.signal()方法执行时,会把条件队列中的首节点移到同步队列中去,直到锁state被获取才被唤醒。

    参考:
    https://www.itzhai.com/articles/analysis-of-reentrantlocks-condition-principle.html

    https://blog.csdn.net/e891377/article/details/104715461

    https://blog.csdn.net/weixin_42103620/article/details/117331593

    https://blog.csdn.net/sinat_32873711/article/details/106619981

    https://blog.csdn.net/qq_33996921/article/details/106629546

    相关文章

      网友评论

        本文标题:Java并发编程——ReentrantLock—Conditio

        本文链接:https://www.haomeiwen.com/subject/mfboxrtx.html