美文网首页并发编程学习
并发编程学习五、同步锁ReentrantLock

并发编程学习五、同步锁ReentrantLock

作者: valentine_liang | 来源:发表于2018-12-30 14:24 被阅读0次

    Valentine 转载请标明出处。

    锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源,在Lock接口出现之前,Java应用程序只能依靠synchronized关键字来实现同步锁的功能,在java5以后,增加了JUC的并发包而且提供了Lock接口用来实现锁的功能,它提供了与synchronized关键字类似的同步功能,只是它比synchronized更加灵活,能够显式地获取和释放锁。

    Lock的初始使用
    Lock是一个接口,核心的两个方法是lock和unlock,它有很多的实现,比如ReentrantLock、ReentrantReadWriteLock

    ReentrantLock
    重入锁,表示支持重新进入的锁,也就是说如果当前线程t1通过调用lock方法获取了锁之后,再次调用lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。

    public class LockDemo {
    
        // 公平重入锁和非公平重入锁
        private static Lock lock = new ReentrantLock();
    
        private static int count = 0;
    
        public static void incr() {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 获得锁
            lock.lock();
            count++;
            // 释放锁
            lock.unlock();
        }
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 1000; i++) {
                new Thread(() -> {
                    LockDemo.incr();
                }).start();
            }
            Thread.sleep(1000);
            System.out.println("result:" + count);
        }
    
    }
    

    输出结果


    结果图

    ReentrantReadWriteLock
    我们以前理解的锁,基本都是排他锁,也就是这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程都会被阻塞。读写锁维护了一对锁,一个读锁、一个写锁;一般情况下,读写锁的性能比排它锁好,因为大多数场景读是多于写的,在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

    public class RWLockDemo {
    
        /**
         * 共享锁-在同一时刻可以有多个线程获得锁,读锁和写锁(读多写少)
         */
        private static Map<String, Object> cacheMap = new HashMap<>();
    
        private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    
        private static Lock read = reentrantReadWriteLock.readLock();
        private static Lock write = reentrantReadWriteLock.writeLock();
    
        /**
         * 使用读写锁可以更大化地提升性能(在读多写少的情况下)
         * <p>
         * 读锁,可以允许多个线程进入
         */
        public static Object get(String key) {
            read.lock();
            try {
                return cacheMap.get(key);
            } finally {
                read.unlock();
            }
        }
    
        /**
         * 其他线程阻塞
         */
        public static Object set(String key, Object value) {
            // 写锁
            write.lock();
            try {
                return cacheMap.put(key, value);
            } finally {
                write.unlock();
            }
        }
    
    }
    

    在这个案例中,通过HashMap来模拟了一个内存缓存,然后使用读写锁来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会被阻塞,因为读操作不会影响执行结果。
    在执行写操作时,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性。
    读锁与读锁可以共享
    读锁与写锁不可以共享(排他)
    写锁与写锁不可以共享(排他)

    Lock和synchronized的简单对比
    通过对Lock的使用以及对synchronized的了解,基本上可以对比出这两种锁的区别了,这个也是在面试过程中比较常见的问题。
    1、从层次上,一个是关键字、一个是类,最直观的差异
    2、从使用上,lock具备更大的灵活性,可以控制锁的释放和获取,而synchronized的锁的释放是被动的,当出现异常或者同步代码块执行完以后,才会释放锁
    3、lock可以判断锁的状态、实现公平锁、非公平锁;而synchronized无法判断锁的状态和synchronized只有非公平锁

    AQS
    Lock之所以能实现线程安全的锁,主要的核心是AQS(AbstractQueuedSynchronizer),AbstractQueuedSynchronizer提供了一个FIFO(first in first out)队列,可以看做是一个用来实现锁以及其他需要同步功能的框架,这简称该类为AQS。AQS的使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态,例如常见的ReentrantLock,CountDownLatch等AQS的两种功能,从使用上来说,AQS的功能可以分为两种:独占和共享。
    独占锁模式下,每次只能有一个线程持有锁,比如上面的ReentrantLock就是以独占方式实现的互斥锁;
    共享锁模式下,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock。
    很显然,独占所是一种悲观保守的加锁策略,它限制了读/读冲突,如果某个只读线程获取锁,则其他线程都只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。

    AQS的内部实现
    同步器依赖内存的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个Node节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
    Node的主要属性如下


    Node内部类

    AQS类底层的数据结构使用双向链表,是队列的一种实现,包括一个head节点和一个tail节点,分别表示头结点和尾节点,其中头节点不存储Thread,仅保存next节点的引用,如图所示


    AQS类底层的数据结构

    当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。


    AQS队列设置尾节点
    同步队列遵循FIFO(first in first out),首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后续节点,而后续节点将会在获取同步状态成功时将自己设置为首节点。
    AQS队列设置首节点

    设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够获取到同步状态,因此设置首节点的方法并不需要CAS来保证,它只需要将首节点设置成原首节点的下一个节点,然后断开原首节点的next引用即可。

    compareAndSet
    AQS中除了本身的链表结构以外,还有一个很关键的功能就是CAS,这个是在多线程并发的情况下保证线程安全的前提下去把线程加入到AQS中的方法,可以简单理解为乐观锁

    /**
         * CAS head field. Used only by enq.
         */
        private final boolean compareAndSetHead(Node update) {
            return unsafe.compareAndSwapObject(this, headOffset, null, update);
        }
    ......
    

    这个方法里面,用到了unsafe类(unsafe类是在sun.misc包下,不属于Java标准,但是很多java的基础类库,包括一些被广泛使用的高性能开发库都是基于unsafe类开发的,比如Netty、Hadoop、Kafka等,unsafe可以认为是java中留下的后门,提供了一些低层级操作,如直接访问内存、线程调度等)
    然后调用了compareAndSwapObject方法

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    

    这是一个native方法,第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值
    整个方法的作用是如果当前时刻的值等于期待值(var4),则更新为新的值(var5),如果更新成功,则返回true,否则返回false;
    这里传入了一个headOffset,这个headOffset是什么呢?在下面的代码中,通过unsafe.objectFieldOffset,然后通过反射获取了AQS类中的成员变量,并且这个成员变量被volatile修饰的


    示例图
    示例图

    unsafe.objectFieldOffset
    headOffset这个是指类中相应字段在该类的偏移量,在这里具体指head这个字段在AQS类的内存中相对于该类首地址的偏移量。
    一个Java对象可以看成是一段内存,每个字段都得按照一定的顺序放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于这对象的起始内存地址的字节偏移,用于后面的compareAndSwapObject中,去根据偏移量找到对象在内存中的具体位置,这个方法在unsafe.cpp文件中,代码如下


    示例图

    所以其实compareAndSet方法,最终调用的是unsafe类的compareAndSwap,这个指令会对内存中的共享数据做原子的读写操作。
    1、首先,cpu会把内存中将要被更改的数据与期望值作比较
    2、然后,当两个值相等时,cpu才会将内存中的对象替换为新的值,否则不做变更操作
    3、最后,返回执行结果
    很显然,这是一种乐观锁的实现思路。

    ReentrantLock的实现原理分析
    之所以叫重入锁是因为同一个线程如果已经获得了锁,那后续该线程调用lock方法的时候,不需要再次获得锁,也就是不会阻塞;重入锁提供了两种实现,一种是非公平的重入锁,另一种是公平的重入锁。
    怎么理解公平和非公平?
    如果在绝对时间上,先对锁进行获取的请求一定先被免租获得锁,那么这个锁就是公平锁,反之就是不公平的,简单来说公平锁就是等待时间最长的线程最优先获取锁。

    非公平锁的实现流程时序图


    非公平锁的实现流程时序图

    源码分析
    ReentrantLock.lock

    public void lock() {
            sync.lock();
        }
    

    这个是获取锁的入口,调用了sync.lock,sync是一个实现了AQS的抽象类,这个类的主要作用是用来实现同步控制的,并且sync有两个实现,一个是NonfairSync(非公平锁),一个是FairSync(公平锁),先分析一下非公平锁的实现
    NonfairSync.lock

    final void lock() {
                // 这是跟公平锁的主要区别,一上来就试探是否空闲,
                // 如果可以插队,则设置获得锁的线程为当前线程
                if (compareAndSetState(0, 1))
                    // exclusivOwnerThread属性是AQS从父类
                    //  AbstractOwnableSynchronizer中继承的属性,
                    // 用来保存当前占用同步状态的线程
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    // 尝试去获取锁
                    acquire(1);
            }
    

    compareAndSetState,这个方法是通过cas算法去改变state的值,这个state是AQS中的一个变量,对于ReentrantLock来说,如果state=0表示无锁状态,如果state>0表示有锁状态,在这里表示当前的state如果等于0,则替换为1,如果替换成功表示获取锁成功,由于ReentrantLock是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread == Thread.currentThread()),即可加锁,每次加锁都会讲state的值+1,state等于几,就代表当前持有锁的线程加了几次锁,解锁时每解锁一次就会将state减一,state减到0后,锁就被释放掉,这时其他线程可以加锁。

    AbstractQueuedSynchronizer.acquire
    如果CAS操作未能成功,说明state已经不为0,此时继续acquire(1)操作,acquire是AQS中的方法,当多个线程同时进入这个方法时,首先通过cas去修改state的状态,如果修改成功表示竞争锁成功,竞争失败的,tryAcquire会返回false

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    这个方法的主要作用是
    尝试获取独占锁,获取成功则返回,否则自旋获取锁,并且判断中断表示,如果中断标识为true,则设置线程中断,addWaiter方法把当前线程封装成Node,并添加到队列的尾部。

    NonfairSync.tryAcquire
    tryAcquire方法尝试获取锁,如果成功就返回,如果不成功,则把当前线程和等待状态信息构造成一个Node节点,并将节点放入同步队列的尾部,然后为同步队列中的当前节点循环等待获取锁,直到成功。

    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }
    

    nofairTryAcquire
    这里可以看出非公平锁的含义,即获取锁并不会严格根据争取锁的先后顺序决定,这里的实现逻辑类似synchronized关键字的偏向锁的做法,即可重入而不用进一步进行锁的竞争,也解释了ReentrantLock中的Reentrant的意义。

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                // 获取当前的状态,默认情况下是0,表示无锁状态
                int c = getState();
                if (c == 0) {
                    // 通过cas来改变state状态的值,如果更新成功,表示获取锁成功,
                    // 这个操作外部方法lock()就做过一次,这里再做是为了再尝试一次,
                    // 尽量以最简单的方式获取锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果当前线程等于获取锁的线程,表示重入,直接累加重入次数
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    // 如果这个状态值越界,抛出异常,如果没有越界,则设置后返回true
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                // 如果状态不为0,且当前线程不是owner,则返回false
                return false;
            }
    

    addWaiter

    private Node addWaiter(Node mode) {
            // 创建一个独占的Node节点,mode为排他模式
            Node node = new Node(Thread.currentThread(), mode);
            // 尝试快速入队,如果失败则降级至full enq,tail在AQS中表示同步队列队尾的属性,
            // 刚开始为null,所以进行enq(node)方法
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                //  防止有其他线程修改tail,使用cas进行修改,如果失败则降级至full enq
                if (compareAndSetTail(pred, node)) {
                    // 如果成功之后旧的tail 的next指针再指向新的tail,成为双向链表
                    pred.next = node;
                    return node;
                }
            }
            // 如果队列为null或者cas设置新的tail失败
            enq(node);
            return node;
        }
    

    enq
    enq就是通过自旋操作把当前节点加入到队列中

    private Node enq(final Node node) {
            // 无效的循环,使用for(;;)是因为它执行的指令少,不占用寄存器
            for (;;) {
                // 此时head,tail都为null
                Node t = tail;
                // 如果tail为null则说明队列首次使用,需要进行初始化
                if (t == null) { // Must initialize
                    // 设置头结点,如果失败则存在竞争,留至下一轮循环
                    if (compareAndSetHead(new Node()))
                        // 用cas的方式创建一个空的Node作为头节点,因为此时队列中
                        // 只有一个首节点,所以tail也指向head,第一次循环执行结束
                        tail = head;
                } else {
                    // 进行第二次循环时,tail不为null,进入else区域,将当前线程的
                    // Node节点的prev指向tail,然后使用cas将tail指向Node,
                    // 这部分代码和addWaiter代码一样,将当前节点添加到队列
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        // t此时指向tail,所以cas成功,将tail重新指向Node,
                        // 此时t为更新前的tail的值, 即指向空的头节点,t.next=node,
                        // 就将头节点的后续节点指向node,返回头节点
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    代码运行到这里,AQS队列的结构就是这样的一个表现,如图


    示例图

    acquireQueued
    addWaiter返回了插入的节点,作为acquireQueued方法的入参,这个方法主要用于争抢锁

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 获取prev节点,若为null即刻抛出NullPointException
                    final Node p = node.predecessor();
                    // 如果前驱为head才有资格进行锁的争夺
                    if (p == head && tryAcquire(arg)) {
                        // 获取锁成功后就不需要进行同步操作了,
                        // 获取锁成功的线程为新的head节点
                        setHead(node);
                        // 凡是head节点,head.thread与head.prev永远为null,
                        // 但是head.next不为null
                        p.next = null; // help GC
                        // 获取锁成功
                        failed = false;
                        return interrupted;
                    }
                    // 如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    原来的head节点释放锁以后,会从队列中移除,原来head节点的next节点会成为head节点


    示例图

    shouldParkAfterFailedAcquire
    从上面的分析可以看出,只有队列的第二个节点才有机会争用锁,如果成功获取锁,则此节点晋升为首节点,对于第三个及以后的节点,if(p == head)条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)操作,shouldParkAfterFailedAcquire方法是判断一个争用锁的线程是否应该被阻塞,它首先判断一个节点的前置节点的状态是否为Node.SIGNAL,如果是,说明此节点已经将状态设置,如果锁释放则通知它,它就可以安全的阻塞了,返回true

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 前继节点的状态
            int ws = pred.waitStatus;
            // 如果是SIGNAL状态,意味着当前线程需要被unpark唤醒
            if (ws == Node.SIGNAL)
                return true;
    /**如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环
    找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。
    在下次循环执行shouldParkAfterFailedAcquire时,返回true。
    这个操作实际是把队列中CANCELLED的节点剔除掉。*/
            // 如果前继节点是“取消”状态,
            // 则设置“当前节点”的“当前前节点”为“原来前节点”的“前继节点”
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SINGNAL状态
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    假如有t1、t2两个线程都加入到了链表中
    如果head节点位置的线程一直持有锁,那么t1和t2在多次尝试获取锁失败以后,就会挂起(这个地方就是避免了惊群效应,每个节点只需要关心上一个节点的状态即可)
    SIGNAL:值为-1,表示当前节点的后续节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后续节点
    CONDITION:值为-2,表示当前节点在等待condition,即在condition队列中
    PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用)

    parkAndCheckInterrupt
    如果shouldParkAfterFailedAcquire返回了true,则会执行:parkAndCheckInterrupt()方法,它通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,实现了Lock的操作

    private final boolean parkAndCheckInterrupt() {
    // LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞
    LockSupport.park(this);
    return Thread.interrupted();
    }
    

    ReentrantLock.unlock
    加锁的过程分析完以后,再来分析一下释放锁的过程,调用release方法,这个方法里面做了两件事,释放锁和唤醒park的线程

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    tryRelease
    这个动作可以认为是一个设置锁状态的操作,而且是将状态减掉传入的参数值(值是1),如果结果装填为0,就将排他锁的Owner设置为null,使得其他线程有机会执行。在排他锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4,只有unlock()的次数与lock()的次数对应才会讲Owner线程设置为空,而且也只有这种情况下才会返回true。

    protected final boolean tryRelease(int releases) {
                // 将获取锁的次数减一
                int c = getState() - releases;
                // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                // 由于重入的关系,不是每次释放锁c都等于0,直到最后一次释放锁时,
                // 才会把当前线程释放
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    LockSupport
    LockSupport类是Java6引入的一个类,提供了基本的线程同步原语,LockSupport实际上是调用了Unsafe里面的函数,归结到Unsafe里,只有两个函数

    public native void unpark(Thread jthread);
    public native void park(boolean isAbsolute, long time);
    

    unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”,这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。
    permit相当于0/1开关,默认是0,调用一次unpark就加1变成了1,调用一次park会消费permit,又会变成0.如果再调用一次park会阻塞,因为permit已经是0了,知道permit变成1,这时调用unpark会把permit设置为1,每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark不会累计。
    在使用LockSupport之前,我们对线程做同步,只能使用wait和notify,但是wait和notify其实不是很灵活,并且耦合性很高,调用notify必须确保某个线程处于wait状态,而park/unpark模型真正解耦了线程之间的同步,先后顺序没有直接关联,同时线程之间不再需要一个Object或者其他变量来存储状态,不再需要关心对方的状态。

    总结
    分析了独占式同步状态获取和释放过程后,做个简单的总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋,移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后续节点。

    公平锁和非公平锁的区别
    锁的公平性是相对于获取锁的顺序而言的,如果一个公平锁,那么锁的获取顺序应该符合请求的绝对时间顺序,也就是FIFO,在上面分析的列子来说,只要cas设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点有两个
    FairSync.tryAcquire

    final void lock() {
                acquire(1);
            }
    

    非公平锁在获取锁的时候,会先通过cas进行抢占,而公平锁不会
    FairSync.tryAcquire

    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    这个方法与nonfairTryAcquire(int acquires)比较,不同的地方在于判断条件多了hasQueuedPredecessors()方法,也就是加入了【同步队列中当前节点是否有前驱节点】的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

    Condition
    通过前面的学习,我们知道任意一个java对象,都有一组监视器方法(定义在java.lang.Object上),主要包括wait()、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式JUC包提供了Condition来对锁进行精准控制,Condition是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

    condition使用案例
    ConditionWait

    public class ConditionDemoWait implements Runnable {
    
        private Lock lock;
        private Condition condition;
    
        public ConditionDemoWait(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            System.out.println("begin -ConditionDemoWait");
            try {
                lock.lock();
                condition.await();
                System.out.println("end - ConditionDemoWait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    

    ConditionSignal

    public class ConditionDemoSignal implements Runnable {
    
        private Lock lock;
        private Condition condition;
    
        public ConditionDemoSignal(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            System.out.println("begin -ConditionDemoSignal");
            try {
                lock.lock();
                condition.signal();
                System.out.println("end - ConditionDemoSignal");
            } finally {
                lock.unlock();
            }
        }
    }
    

    通过这个案例简单地实现了wait和notify的功能,当调用await方法之后,当前线程会释放锁并等待,而其他线程调用condition对象的signal或者signalall方法通知被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁,所以condition中两个最仲要的方法,一个是await,一个是signal方法
    await:把当前线程阻塞挂起
    signal:唤醒阻塞的线程

    await方法
    调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待对鞋并释放锁,同时线程状态变为等待状态,当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 创建一个新的节点,节点状态为condition,采用的数据结构仍然是链表
                Node node = addConditionWaiter();
                // 释放当前的锁,得到锁的状态,并唤醒AQS队列中的一个线程
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞,
                //  isOnSyncQueue 判断当前node状态,如果是CONDITION状态,或者不在队列上,
                //  就继续阻塞,还在队列上且不是CONDITON状态了,就结束循环和阻塞
                while (!isOnSyncQueue(node)) {// 第一次判断的是false,因为前面已经释放锁了
                //  第一次总是park自己,开始阻塞等待
                    LockSupport.park(this);
                // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,
                // 会在isOnSyncQueue中判断自己是否在队列上
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 当这个线程醒来,会尝试获取锁,当acquireQueued返回false就是拿到锁了,
                // interruptMode != THROW_IE -> 表示这个线程没有成功讲node入队,
                // 但signal执行了enq方法让其入队了,然后将这个变量设置成REINTERRUPT
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 如果node的下一个等待着不是null ,则进行清理,清理Condition队列上的节点,
                // 如果是null就没有好清理的了
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                // 如果线程被中断了,需要抛出异常,或者什么都不做
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    Signal
    调用Condition的signal()方法,将会唤醒在等待队列等待时间最长的节点(首节点),在唤醒节点之前,会讲节点移到同步队列中

    public final void signal() {
                // 先判断当前线程是否获取了锁
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                // 拿到Condition队列上第一个节点
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
    private void doSignal(Node first) {
                do {
                    // 如果第一个节点的下一个节点是null,那么最后一个节点也是 null
                    if ( (firstWaiter = first.nextWaiter) == null)
                    // 将next节点设置成 null
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    

    该方法先是cas修改了节点状态,如果成功,就将这个节点放到AQS队列中,然后唤醒这个节点上的线程,此时那个节点就会在await方法中苏醒

    final boolean transferForSignal(Node node) {
     
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            Node p = enq(node);
            int ws = p.waitStatus;
            // 如果上一个节点的状态被取消了,或者尝试设置上一个节点
            // 的状态为SIGNAL失败了(SIGNAL 表示它的next节点需要停止zuse)
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    

    学习来源https://www.gupaoedu.com/

    相关文章

      网友评论

        本文标题:并发编程学习五、同步锁ReentrantLock

        本文链接:https://www.haomeiwen.com/subject/hosjlqtx.html