美文网首页Java开发那些事工作生活
Java并发之ReadWriteLock详解

Java并发之ReadWriteLock详解

作者: 刘一一同学 | 来源:发表于2019-07-30 09:32 被阅读13次

1. 简介

ReentrantLock 都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

在没有读写锁之前,只能通过 Java 的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠 synchronized 关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行。Java 并发包提供读写锁接口 ReadWriteLock 是基于 ReentrantReadWriteLock 实现的,其特性包括:

  • 公平性选择:支持非公平和公平的获取锁方式,但非公平的吞吐量大于公平的。
  • 重入锁:支持重入锁,即获取读锁之后还能再次获取读锁,获取写锁后还能再次获取读锁或写锁。
  • 锁降级:允许获取写锁,获取读锁,释放写锁的次序,即写锁可以降级为读锁。
  • 锁获取中断:读取锁和写入锁都支持获取锁期间被中断。
  • 支持条件变量:写入锁提供了条件变量 Condition 的支持,这个和独占锁一致,但是读取锁却不允许获取条件变量,将得到一个 UnsupportedOperationException 异常。

2. 实现原理

ReadWriteLock 基于 ReentrantReadWriteLock 实现的,而 ReentrantReadWriteLock 是基于 AQS(AbstractQueuedSynchronizer)实现的。读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。以 ReentrantLock 中的自定义同步器的实现为例,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态得涉及成为读写锁实现得关键。

在一个整型变量中维护多个状态,需要使用 “按位切割方法”,在读写锁中,我们需要将一个整型变量表示为读和写两种状态,所以会将整型变量分割为两部分,高16位标识读状态,低16位为写状态

通过位运算便可以确定读写状态。假设当前同步状态值为S,写状态等于 S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0由移16位)。当写状态增加1时,等于S+(1<<16),也就是S+0x00010000。

2.1 读锁的获取

读锁是一个支持重入的共享锁,它能够被多个线程同时持有,在没有其他写线程访问时,读锁总是会被成功获取。ReadLock 提供了 lock() 方法来获取读锁,最终会调用 Sync 类的 tryAcquireShared(int) 方法来获取读锁。

protected final int tryAcquireShared(int unused) {
    // 获取当前线程对象
    Thread current = Thread.currentThread();
    // 获取同步状态
    int c = getState();
    // 如果存在写锁,且持有写锁的线程不是当前对象,返回-1,表示获取读锁失败
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 获取读锁状态
    int r = sharedCount(c);
    // 如果读锁不需要等待且读锁获取次数没超过限制,同时同步状态修改成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // 读锁第一次被线程获取
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 读锁被第一次获取读锁的线程重复获取
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 获取读锁失败,调用fullTryAcquireShared(Thread)方法,放到循环里重试
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            // 这里对应锁降级,若当前线程已持有写锁,则允许当前线程继续获取读锁,否则直接返回
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 如果读线程需要阻塞
            // 当前线程是第一次获取读锁的线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                 // 当前线程不是第一次获取读锁的线程
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        // 如果读锁达到了最大值,抛出异常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 同步状态修改成功
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 下面的处理与tryAcquireShared(int)类似
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

fullTryAcquireShared(Thread) 会判断是否要锁降级,然后根据 “是否需要阻塞等待”,“读锁状态是否超过限制” 等进行处理。如果不需要阻塞等待,并且读锁没有超过限制,则通过 CAS 尝试获取锁,并返回1。

2.2 读锁的释放

与写锁一样,读锁的释放也通过 AQS 的模板方法完成,最终会调用在Sync重写的 tryReleaseShared(int)方法。

rotected final boolean tryReleaseShared(int unused) {
    // 获取当前线程对象
    Thread current = Thread.currentThread();
    // 如果想要释放锁的线程为第一个获取锁的线程
    if (firstReader == current) {
        // 当前线程仅获取了一次读锁,则需要将firstReader设置null,否则firstReaderHoldCount - 1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 通过CAS操作更新同步状态
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

2.3 HoldCounter

读锁的获取与释放过程中都用到了 HoldCounter,它保存了线程持有共享锁(读锁)的数量、重入的数量,主要起着计数器的作用,对读锁的获取与释放操作会更新对应的计数值。若线程获取读锁,则该计数器+1,释放读锁,该计数器-1。只有当线程获取读锁后才能对读锁进行释放、重入操作。

2.4 写锁的获取

写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,再次获取锁则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。源码如下:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 取同步状态state的低16位,写同步状态
    int w = exclusiveCount(c);    
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 存在读锁或当前线程不是已获取写锁的线程,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 判断同一线程获取写锁是否超过最大次数,支持可重入
        if (w + exclusiveCount(acquires) > MAX_COUNT)    //
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    // 此时c=0,读锁和写锁都没有被获取
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 设置占用排它锁的线程是当前线程
    setExclusiveOwnerThread(current);
    return true;
}

从源代码可以看出,获取写锁的步骤如下:

  1. 判断同步状态 state 是否为0。如果 state!=0,则表示当前已经有其他线程获取了读锁或写锁,执行步骤2;否则执行步骤5。
  2. 判断同步状态 state 的低16位(w)是否为0。如果 w==0,说明其他线程获取了读锁,返回 false;如果 w!=0,说明其他线程获取了写锁,执行步骤3。
  3. 判断获取了写锁的线程是否是当前线程,若不是返回 false,否则执行步骤4。
  4. 判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新为独占锁),返回 true。
  5. 此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),如果不需要阻塞,则 CAS 更新同步状态,若 CAS 成功则返回 true,否则返回 false。如果需要阻塞则返回 false。

2.5 写锁的释放

WriteLock 类提供了 unlock() 方法释放写锁,获取写锁的调用过程就不再具体介绍了,写锁的释放最终会调用Sync类的 tryRelease(int) 方法:

protected final boolean tryRelease(int releases) {
    // 当前线程不是写锁的持有线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 计算新的同步状态
    int nextc = getState() - releases;
    // 判断写锁是否完全释放,若是,则将写锁持有线程设置为null
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    // 更新状态
    setState(nextc);
    return free;
}

写锁的释放与ReentrantLock的释放过程基本类似,每次释放均是减少写状态,当写状态为0时表示写锁已经完全释放了,从而等待的读写线程能够继续访问读写锁,获取同步状态,同时此次写线程的修改对后续的写线程可见。

2.6 重入锁

可重入锁,就是说一个线程在获取某个锁后,还可以继续获取该锁,即允许一个线程多次获取同一个锁。一个线程获取多少次锁,就必须释放多少次锁。

2.7 锁降级

ReentrantReadWriteLock 支持锁降级,当一个线程获取了写锁之后,也可以获取读锁。但不支持锁升级,线程从读锁升级成写锁是不允许的。

3. 使用示例

public class ReadWriteLockDemo {

    private static Map<String, Object> map = new HashMap<>();
    private static ExecutorService executorService = Executors.newFixedThreadPool(20);
    private static ReadWriteLock lock = new ReentrantReadWriteLock();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 20; i++) {
            executorService.execute(new MyThread());
            Thread.sleep(1000);
        }
    }

    static class MyThread implements Runnable {
        @Override
        public void run() {
           int random =  new Random().nextInt();
            try {
                lock.writeLock().lock();
                map.put("name" + random, "boo" + random);
                lock.writeLock().lock();
                System.out.println(Thread.currentThread().getName() + ":writeLock");
                lock.readLock().lock();
                lock.readLock().lock();
                map.get("name" + random);
            } finally {
                System.out.println(Thread.currentThread().getName() + ":readLock");
                lock.writeLock().unlock();
                lock.writeLock().unlock();
                System.out.println(Thread.currentThread().getName() + ":writeUnlock");
                lock.readLock().unlock();
                lock.readLock().unlock();
                System.out.println(Thread.currentThread().getName() + ":readUnlock");
            }
        }
    }
}

相关文章

网友评论

    本文标题:Java并发之ReadWriteLock详解

    本文链接:https://www.haomeiwen.com/subject/gvmthctx.html