美文网首页
ReentrantReadWriteLock

ReentrantReadWriteLock

作者: 得力小泡泡 | 来源:发表于2021-01-22 16:34 被阅读0次

1、ReentrantReadWriteLock的介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。

针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁

由AQS那一章可知:
ReentrantReadWriteLock:读写锁,而不管读锁还是写锁本质上都是依赖于一个AQS对象,那么如何通过一个int的state变量来表示两种场景呢?此时用的是state的高16位表示的是读线程的数量,而低16位表示的是写线程的数量了。

下面根据此代码对ReentrantReadWriteLock进行探讨
package com.concurrency2;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.IntStream;

public class MyTest10 {

    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public void method() {
        try {
            //readWriteLock.readLock().lock();
            readWriteLock.writeLock().lock();

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("method");

        } finally {
            //readWriteLock.readLock().unlock();
            readWriteLock.writeLock().unlock();
        }
    }

    public static void main(String[] args) {
        MyTest10 myTest10 = new MyTest10();
        IntStream.range(0, 10).forEach(i -> {
            new Thread(() -> {
                myTest10.method();
            }).start();
        });
    }
}

输出

method
method
method
method
method
method
method
method
method
method

可以发现当是读锁操作时,特别快,总时间在1秒左右;当是写锁操作时,总时间在10秒左右,有读多写少的情况下,利用读锁和写锁可以大大提高性能

2、ReentrantReadWriteLock使用公平锁和非公平锁的创建方式

公平锁的创建方式

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);

非公平锁的创建方式

    public ReentrantReadWriteLock() {
        this(false);
    }
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

3、关于ReentrantReadWriteLock的操作逻辑

写锁:

在获取写锁时,会尝试判断当前对象是否拥有了锁(读锁或写锁)
1、当该对象拥有写锁,并且该写锁的拥有者是当前线程时,则该对象的写锁数量加1
2、当该对象拥有读锁或者写锁且写锁拥有者并非是当前线程,则进入FIFO阻塞队列
3、当该对象不拥有锁时(即当前是读锁与写锁都没有被获取的情况),会有公平锁与非公平锁两种情况,最后将当前对象的排他锁持有者设为自己

  • 公平锁:当FIFO阻塞队列中存在元素 或者 CAS操作抢不到写锁,则进入FIFO阻塞队列中;否则该线程就会为当前对象上写锁,并且将写锁的个数加1
  • 非公平锁:直接尝试CAS操作抢写锁,若失败抢不到写锁,则进入FIFO阻塞队列;否则成功则该线程就会为当前对象上写锁,并且将写锁的个数加1

在释放写锁时,底层会对state成员变量的低16位(写锁的数量)进行减 1 操作,如果减 1 后,
1、写锁的数量不为0,表示当前线程获取了多次写锁,则执行完毕;
2、写锁的数量为0,则调用LockSupport的unpark方法唤醒该线程后的等待队列中的第一个后继线程(pthread_mutex_unlock),将其唤醒,使之能够获取到对象的锁(release时,对于公平锁和非公平锁的处理逻辑是一致的),这个时候如果是非公平锁的话,也有可能被刚来的线程抢到了锁。之所以调用release方法后写锁的数量不为0,原因在于ReentrantReadWriterLock是可重入锁,多次调用写锁的lock方法写锁的数量加1

读锁:

在获取读锁时,会尝试判断当前对象是否拥有了写锁
1、当该对象拥有写锁,并且该写锁的拥有者是当前线程时,则该对象的读锁数量加1(可能会发生锁降级的过程)
2、当该对象拥有写锁,并且该写锁的拥有者不是当前线程时,则进入到FIFO阻塞队列中
3、当该对象不拥有写锁,会有公平锁与非公平锁两种情况

  • 公平锁:当FIFO阻塞队列中存在元素 或者 尝试CAS操作抢不到读锁,则进入FIFO阻塞队列中;否则该线程就会为当前对象上读锁,并且将读锁的个数加1
  • 非公平锁:当FIFO第一个结点是排他式结点 或者 尝试CAS操作抢不到读锁,则进入FIFO阻塞队列中;否则该线程就会为当前对象上读锁,并且将读锁的个数加1(毕竟读多写少,如果还没有这点判断机制,写锁可能会发生【饥饿】)

当锁被释放时(调用了unlock方法),那么底层会调用releaseShared方法进行如下两个操作
1、将当前线程副本中的读锁数量进行减 1 操作
2、将state成员变量的高16位(总读锁的数量)进行减 1 操作,并且一直通过CAS操作对state的值进行更新,直到更新成功为止,更新成功后判断当前对象是否有锁(读锁或者写锁)

  • 若当前对象有其他锁,则执行完毕
  • 若当前对象没有任何锁,则调用LockSupport的unpark方法唤醒该线程后的等待队列中的第一个后继线程(pthread_mutex_unlock),将其唤醒,使之能够尝试获取到对象的锁(releaseShare时,对于公平锁和非公平锁的处理逻辑是一致的),这个时候如果是非公平锁的话,也有可能被刚来的线程抢到了锁。之所以调用releaseShare方法后读锁的数量不为0,原因在于ReentrantReadWriterLock是可重入锁,多次调用读锁的lock方法读锁的数量加1

4、读写锁有以下三个重要的特性:

(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

(2)重进入:读锁和写锁都支持线程重进入。

(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

5、源码分析

ReentrantReadWriteLock 类的整体结构
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

    /** 读锁 */
    private final ReentrantReadWriteLock.ReadLock readerLock;

    /** 写锁 */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;
    
    /** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /** 返回用于写入操作的锁 */
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    
    /** 返回用于读取操作的锁 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }


    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class NonfairSync extends Sync {}

    static final class FairSync extends Sync {}

    public static class ReadLock implements Lock, java.io.Serializable {}

    public static class WriteLock implements Lock, java.io.Serializable {}
}
1、类的继承关系

说明:可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。

2、类的内部类
image.png

说明:如上图所示,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock实现了Lock接口、WriteLock也实现了Lock接口。

3、Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {
    // 版本序列号
    private static final long serialVersionUID = 6317671515068378041L;
    // 高16位为读锁,低16位为写锁
    static final int SHARED_SHIFT   = 16;
    // 读锁单位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 读锁最大数量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 写锁最大数量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 本地线程计数器
    private transient ThreadLocalHoldCounter readHolds;
    // 缓存的计数器
    private transient HoldCounter cachedHoldCounter;
    // 第一个读线程
    private transient Thread firstReader = null;
    // 第一个读线程的计数
    private transient int firstReaderHoldCount;

    // 构造函数
    Sync() {
        // 本地线程计数器
        readHolds = new ThreadLocalHoldCounter();
        // 设置AQS的状态
        setState(getState()); // ensures visibility of readHolds
    }

    /*说明:HoldCounter主要有两个属性,count 和 tid,其中count表示某个读线程重入的次数,
      tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。*/
    // 计数器
    static final class HoldCounter {
        // 计数
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        // 获取当前线程的TID属性的值
        final long tid = getThreadId(Thread.currentThread());
    }
    
    /*说明:ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。
      在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。*/
    // 本地线程计数器
    static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
        // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    protected final boolean tryRelease(int releases) {}
    protected final boolean tryAcquire(int acquires) {}
    protected final boolean tryReleaseShared(int unused) {}
    protected final int tryAcquireShared(int unused) {}

4、Sync类的两个内部类以及作用

从上面的抽取出来分析

(1)HoldCouter类

    // 某线程读锁的数量的计数器
    static final class HoldCounter {
        // 计数
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        // 获取当前线程的TID属性的值
        final long tid = getThreadId(Thread.currentThread());
    }

(2)ThreadLocalHoldCounter类

    // 本地线程计数器
    static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
        // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

总结:HoldCouter类的作用是记录某一个线程读锁数量,以及该线程的id,ThreadLocalHoldCounter继承了ThreadLocal<HoldCounter>,可以用来记录每个线程独有的副本,即ThreadLocalHoldCounter充当ThreadLocalMap中的key,HoldCouter充当ThreadLocalMap的value,如图所示

image.png
5、读写状态的设计

读写锁对于同步状态的实现是在一个整形变量state上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写

image.png
假设当前同步状态值为S,get和set的操作如下:
(1)获取写状态:S&0x0000FFFF:将高16位全部抹去
(2)获取读状态:S>>>16:无符号补0,右移16位
(3)写状态加1:S+1
(4)读状态加1:S+(1<<16)即S + 0x00010000
6、ReentrantReadWriteLock公平锁与非公平锁的源码分析

下面的源码分析会用得上

公平锁

    static final class FairSync extends Sync {
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

当前的读写锁是一把公平的读写锁时,本质上是在这里实现公平锁的逻辑,是判断读线程和写线程是否需要进行阻塞的依据。直接返回FIFO阻塞队列的个数情况,如果FIFO阻塞队列有元素,则放入到FIFO阻塞队列中;否则会尝试获取该锁。


非公平锁

    static final class NonfairSync extends Sync {
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

当前的读写锁是一把非公平的读写锁时,本质上是在这里实现非公平锁的逻辑,是判断读线程和写线程是否需要进行阻塞的依据。

  • 如果是写锁,无视FIFO阻塞队列的状态直接返回false,尝试插队;
  • 如果是读锁,判断FIFO阻塞队列的第一个结点是不是排他式结点,如果是,说明有一个线程在等待获取写锁,那么返回true,表示要排队;如果不是,则可以插队(毕竟读多写少,如果还没有这点判断机制,写锁可能会发生【饥饿】)
7、写锁的获取与释放

看下WriteLock类中的lock和unlock方法:

public void lock() {
    sync.acquire(1);
}

public void unlock() {
    sync.release(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

(1)写锁的获取,tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    //当前线程
    Thread current = Thread.currentThread();
    //获取状态
    int c = getState();
    //写线程数量(即获取独占锁的重入数)
    int w = exclusiveCount(c);

    //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
    if (c != 0) {
        // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;
        // 如果写锁状态不为0且写锁没有被当前线程持有返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;

        //判断同一线程获取写锁是否超过最大次数(65535),支持可重入
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //更新状态
        //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。
        setState(c + acquires);
        return true;
    }

    //到这里说明此时c=0,读锁和写锁都没有被获取
    //writerShouldBlock表示是否阻塞
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;

    //设置锁为当前线程所有
    setExclusiveOwnerThread(current);
    return true;
}

从这里可以看出公平锁与非公平锁在写锁中的情况

    //到这里说明此时c=0,读锁和写锁都没有被获取
    //writerShouldBlock表示是否阻塞
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;

公平锁:writerShouldBlock()方法返回hasQueuedPredecessors(),即当前是读锁与写锁都没有被获取的情况,当FIFO阻塞队列中存在元素 或者 CAS操作抢不到写锁,则进入FIFO阻塞队列中;否则该线程就会为当前对象上写锁,并且将写锁的个数加1

非公平锁:writerShouldBlock()方法返回 false,即当前是读锁与写锁都没有被获取的情况,直接尝试CAS操作抢写锁,若失败抢不到写锁,则进入FIFO阻塞队列;否则成功则该线程就会为当前对象上写锁,并且将写锁的个数加1

(2)写锁的释放,tryRelease方法:

protected final boolean tryRelease(int releases) {
    //若锁的持有者不是当前线程,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //让state状态减1,即本质上是让写锁数量减1
    int nextc = getState() - releases;
    //如果独占模式重入数为0了,说明独占模式被释放
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        //若写锁的新线程数为0,则将锁的持有者设置为null
        setExclusiveOwnerThread(null);
    //设置写锁的新线程数
    //不管独占模式是否被释放,更新独占重入数
    setState(nextc);
    return free;
}

当锁被释放时(调用了unlock方法),那么底层会调用release方法对state成员变量的低16位(写锁的数量)进行减 1 操作,如果减 1 后,

  • 写锁的数量不为0,表示当前线程获取了多次写锁,那么release操作就执行完毕;
  • 写锁的数量为0,则调用LockSupport的unpark方法唤醒该线程后的等待队列中的第一个后继线程(pthread_mutex_unlock),将其唤醒,使之能够尝试获取到对象的锁(release时,对于公平锁和非公平锁的处理逻辑是一致的),这个时候如果是非公平锁的话,也有可能被刚来的线程抢到了锁。之所以调用release方法后写锁的数量不为0,原因在于ReentrantReadWriterLock是可重入锁,多次调用写锁的lock方法写锁的数量加1

8、读锁的获取与释放

看下ReadLock类中的lock和unlock方法:

public void lock() {
    sync.acquireShared(1);
}
public void unlock() {
    sync.releaseShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

(3)读锁的获取,tryAcquireShared

protected final int tryAcquireShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 获取状态
    int c = getState();

    //如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 读锁数量
    int r = sharedCount(c);
    /*
     * readerShouldBlock():读锁是否需要等待(公平锁原则)
     * r < MAX_COUNT:持有线程小于最大数(65535)
     * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
     */
     // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
        if (r == 0) { // 读锁数量为0
            // 设置第一个读线程
            firstReader = current;
            // 读线程占用的资源数为1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入
            // 占用资源数加1
            firstReaderHoldCount++;
        } else { // 读锁数量不为0并且不为当前线程
            // 获取计数器
            HoldCounter rh = cachedHoldCounter;
            // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
            if (rh == null || rh.tid != getThreadId(current))
                // 获取当前线程对应的计数器
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0) // 计数为0
                //加入到readHolds中
                readHolds.set(rh);
            //计数+1
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) 
{
      ....
      return 1
}

公平锁:readerShouldBlock()方法返回hasQueuedPredecessors(),即该对象没有拥有写锁的情况,当FIFO阻塞队列中存在元素 或者 尝试CAS操作抢不到读锁,则进入FIFO阻塞队列中;否则该线程就会为当前对象上读锁,并且将读锁的个数加1

非公平锁:readerShouldBlock()方法返回apparentlyFirstQueuedIsExclusive(),其中apparentlyFirstQueuedIsExclusive()指的是第一个结点是不是排他式结点,因此该对象没有拥有写锁的情况,当FIFO第一个结点是排他式结点 或者 尝试CAS操作抢不到读锁,则进入FIFO阻塞队列中;否则该线程就会为当前对象上读锁,并且将读锁的个数加1

(4)读锁的释放,tryReleaseShared

protected final boolean tryReleaseShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    if (firstReader == current) { // 当前线程为第一个读线程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1) // 读线程占用的资源数为1
            firstReader = null;
        else // 减少占用的资源
            firstReaderHoldCount--;
    } else { // 当前线程不为第一个读线程
        // 获取缓存的计数器
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
            // 获取当前线程对应的计数器
            rh = readHolds.get();
        // 获取计数
        int count = rh.count;
        if (count <= 1) { // 计数小于等于1
            // 移除
            readHolds.remove();
            if (count <= 0) // 计数小于等于0,抛出异常
                throw unmatchedUnlockException();
        }
        // 减少计数
        --rh.count;
    }
    for (;;) { // 无限循环
        // 获取状态
        int c = getState();
        // 获取读线程减1后的状态
        int nextc = c - SHARED_UNIT; //SHARED_UNIT = 1 << 16
        if (compareAndSetState(c, nextc)) // 比较并进行设置
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

当锁被释放时(调用了unlock方法),那么底层会调用releaseShared方法进行如下两个操作
1、将当前线程副本中的读锁数量进行减 1 操作
2、将state成员变量的高16位(总读锁的数量)进行减 1 操作,并且一直通过CAS操作对state的值进行更新,直到更新成功为止,更新成功后判断当前对象是否有锁(读锁或者写锁)

  • 若当前对象有其他锁,则执行完毕
  • 若当前对象没有任何锁,则调用LockSupport的unpark方法唤醒该线程后的等待队列中的第一个后继线程(pthread_mutex_unlock),将其唤醒,使之能够尝试获取到对象的锁,(releaseShare时,对于公平锁和非公平锁的处理逻辑是一致的),这个时候如果是非公平锁的话,也有可能被刚来的线程抢到了锁。之所以调用releaseShare方法后读锁的数量不为0,原因在于ReentrantReadWriterLock是可重入锁,多次调用读锁的lock方法读锁的数量加1

相关文章

网友评论

      本文标题:ReentrantReadWriteLock

      本文链接:https://www.haomeiwen.com/subject/sxnrzktx.html