美文网首页
Lock接口与实现类

Lock接口与实现类

作者: 菠萝丶丶 | 来源:发表于2020-07-14 22:26 被阅读0次

    锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

    例如在之前的TwinsLock类中,我们使用了AQS实现共享式资源获取,也是锁的实现方式之一,并且这种方式可以允许同时有多个线程访问同步代码块,这是synchronized所不具备的特性。以及我们在上一篇文章末尾所提到的doAcquireNanos方法,可以以超时状态来获取锁,如果在规定时间内获取不到资源,将返回false,表示获取锁失败。

    Lock的简单使用:

    Lock lock = new ReentrantLock(); 
    lock.lock(); 
    try { 
    } finally { 
    lock.unlock(); 
    }
    

    在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。
    不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。

    Lock接口提供的synchronized关键字不具备的主要特性

    特性 描述
    尝试非阻塞的获取锁 当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,
    则成功获取并持有锁
    能被中断的获取锁 与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的
    线程被中断时,中断异常将被抛出,同时锁会被释放
    超时获取锁 在指定截止时间之前获得锁,如果截止时间到了仍无法获取锁,则返回

    Lock接口的API

    方法名称 描述
    void lock() 获取锁,调用该方法当前线程将会获取锁,当锁获得后,
    从该方法返回
    void lockInterruptibly()
    throws InterruptedException
    可以中断的获取锁,和lock方法的不同在于该方法会响应
    中断,即在锁的获取中可以中断当前线程
    boolean tryLock() 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够
    获取则返回true,否则返回false
    boolean tryLock
    (long time, TimeUnit unit)
    throws InterruptedException
    超时获取锁,当前线程在以下三种情况下会返回:
    1、当前线程在超时时间内获得了锁
    2、当前线程在超时时间内被中断
    3、超时时间结束,返回false
    void unlock() 释放锁
    Condition newCondition() 获取等待通知组件,该组件和当前的锁绑定,当前线程
    只有获得了锁,才能调用该组件的wait方法,
    而调用后,当前线程将释放锁

    这里先简单介绍一下Lock接口的API,随后介绍一下Lock接口的几个实现类。

    重入锁

    重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。

    以之前的Mutex类为例,如果我们在lock方法获取锁之后再在后面调用lock方法会怎样呢?该线程会被自己阻塞住,原因是Mutex类在实现tryAcquire(int acquires)方法时没有考虑到站有锁的线程再次获取锁的情况,而再次调用时返回了false,导致被阻塞,所以可以认为Mutex类实现的锁是不支持重入的锁。而synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁。

    ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

    这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。
    事实上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。

    实现重进入

    重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
    1、线程再次获取锁:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
    2、锁的最终释放:线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。

    下面来看一下ReentrantLock默认实现(非公平)获取同步状态的代码:

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。

    成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值,释放的代码如下:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。

    公平与非公平获取锁的区别

    公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。

    对于非公平锁来说,只要调用nonfairTryAcquire(int acquires)方法设置同步状态成功,则表示当前线程获取到了锁,而公平锁的获取方式略微有所不同:

    // java.util.concurrent.locks.ReentrantLock.FairSync
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

    在之前上一节中的twinsLockTest方法中,我们有讲到执行结果基本上获取到资源的线程都是固定的那两个,这时候由于当一个线程请求锁时,只要获取了同步状态即成功获取锁,而在这个前提下,刚释放锁的线程再次获得锁的概率就非常大,使得其他线程只能继续在同步队列中等待。而公平锁由于为了保证每个线程都能获取到锁,就会进行额外的线程上下文切换,这样就增加了额外的开销,所以公平锁虽然公平,但是性能却比非公平锁要低。

    在《Java并发编程的艺术》中有一则测试,测试的部分代码如下:

    public class FairAndUnfairTest {
        private static Lock fairLock = new ReentrantLock2(true);
        private static Lock unfairLock = new ReentrantLock2(false);
    
        @Test
        public void fair() {
            testLock(fairLock);
        }
    
        @Test
        public void unfair() {
            testLock(unfairLock);
        }
    
        private void testLock(Lock lock) {
            // 启动5个Job(略)
        }
    
        private static class Job extends Thread {
            private Lock lock;
    
            public Job(Lock lock) {
                this.lock = lock;
            }
    
            public void run() {
                // 连续2次打印当前的Thread和等待队列中的Thread(略)
            }
        }
    
        private static class ReentrantLock2 extends ReentrantLock {
            public ReentrantLock2(boolean fair) {
                super(fair);
            }
    
            public Collection<Thread> getQueuedThreads() {
                List<Thread> arrayList = new ArrayList<Thread>(super.getQueuedThreads());
                Collections.reverse(arrayList);
                return arrayList;
            }
        }
    }
    

    测试结果如下图所示:


    公平性和非公平性在系统线程上下文切换方面的对比

    在测试中公平性锁与非公平性锁相比,总耗时是其94.3倍,总切换次数是其133倍。可以看出,公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

    读写锁

    之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
    一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是 ReentrantReadWriteLock,它提供的特性如下表所示:


    ReentrantReadWriteLock的特性

    ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法,而其实现——ReentrantReadWriteLock,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,这些方法以及描述如下表所示:


    ReentrantReadWriteLock展示内部工作状态的方法

    接下来,通过一个缓存示例说明读写锁的使用方式,示例代码如下:

    public class Cache {
        static Map<String, Object> map = new HashMap<String, Object>();
        static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        static Lock r = rwl.readLock();
        static Lock w = rwl.writeLock();
    
        // 获取一个key对应的value 
        public static final Object get(String key) {
            r.lock();
            try {
                return map.get(key);
            } finally {
                r.unlock();
            }
        }
    
        // 设置key对应的value,并返回旧的value 
        public static final Object put(String key, Object value) {
            w.lock();
            try {
                return map.put(key, value);
            } finally {
                w.unlock();
            }
        }
    
        // 清空所有的内容 
        public static final void clear() {
            w.lock();
            try {
                map.clear();
            } finally {
                w.unlock();
            }
        }
    }
    

    上述代码通过一个非线程安全的HashMap作为缓存的实现,同时使用读写锁来保证Cache是线程安全的,在获取数据是需要获取读锁,使得并发访问时不会被阻塞,写操作和clear操作需要获取写锁,获取写锁后,其余读写操作均会被阻塞,只有等待当前写锁释放之后才能其他操作。Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式。

    读写锁的实现分析

    接下来分析ReentrantReadWriteLock的实现,主要包括:读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级。

    读写状态的设计
    读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态 回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。

    正常思维我们可以将一个数的前几位用来表示读,后几位用来表示写,这样前后分开互不干扰,而在读写锁中也使用了类似的设计,读写锁资源控制的一个整型变量使用了位运算进行“按位切割”,高16位表示读,低16位表示写,如下图所示:


    读写锁状态的划分方式

    当前同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。读写锁是如何迅速确定读和写各自的状态呢?答案是通过位运算。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。

    根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。

    写锁的获取与释放
    写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态,获取写锁的代码如下所示:

    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // 存在读锁或者当前获取线程不是已经获取写锁的线程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    

    其逻辑大概分为以下几点:

    • 锁状态不为0并且写状态不为0(存在写锁),且获取锁的线程并非当前线程,则失败
    • 写锁计数器超过最大值,则失败
    • 根据公平锁/非公平锁的实现是否需要阻塞当前写锁(公平锁需判断是否有节点等待时间比自己长,非公平锁默认返回false),如果返回false,则直接尝试获CAS取同步状态,失败则返回false,成功则设置所有线程并从tryAcquire方法返回

    写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。

    读锁的获取与释放
    读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。获取读锁的实现从Java 5到Java 6变得复杂许多,主要原因是新增了一些功能,例如getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂。因此,这里将获取读锁的代码做了删减,保留必要的部分:

    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        //如果存在写锁,且锁的持有者不是当前线程,直接返回-1
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        /*
         * readerShouldBlock():读锁是否需要等待(公平锁原则)
         * r < MAX_COUNT:持有线程小于最大数(65535)
         * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
         */
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            // holdCount部分省略
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            } else if (readerShouldBlock()) {
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                return 1;
            }
        }
    }
    

    读锁获取的过程相对于独占锁而言会稍微复杂下,整个过程如下:

    1. 因为存在锁降级情况,占有写锁的线程可以获取读锁,如果存在写锁且锁的持有者不是当前线程则直接返回失败,否则继续。锁降级就意味着写锁是可以降级为读锁的,但是需要遵循先获取写锁、获取读锁在释放写锁的次序。
    2. 依据公平性原则,判断读锁是否需要阻塞,读锁持有线程数小于最大值(65535),且设置锁状态成功,执行以下代码(对于HoldCounter下面再阐述),并返回1。如果不满足改条件,执行fullTryAcquireShared()。
    3. fullTryAcquireShared(Thread current)会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1

    锁降级
    锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

    接下来看一个锁降级的示例。因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作,如下代码所示:

    public void processData() {
        readLock.lock();
        if (!update) {
            // 必须先释放读锁 
            readLock.unlock();
            // 锁降级从写锁获取到开始 
            writeLock.lock();
            try {
                if (!update) {
                    // 准备数据的流程(略) 
                    update = true;
                }
                readLock.lock();
            } finally {
                writeLock.unlock();
            }
            // 锁降级完成,写锁降级为读锁 
        }
        try {
            // 使用数据的流程(略) 
        } finally {
            readLock.unlock();
        }
    }
    

    上述示例中,当数据发生变更后,update变量(布尔类型且volatile修饰)被设置为false,此时所有访问processData()方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。

    锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

    RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。

    相关文章

      网友评论

          本文标题:Lock接口与实现类

          本文链接:https://www.haomeiwen.com/subject/xwjqhktx.html