美文网首页
4. ReentrantReadWriteLock

4. ReentrantReadWriteLock

作者: shallowinggg | 来源:发表于2019-03-15 23:10 被阅读0次

关于读写锁的一些理论在 十一 .Java并发工具 中已经介绍过,读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发行相比一般的排他锁有了很大提升。

除了保证写操作对读操作的可见性以及并发行的提升外,读写锁能够简化读写交互场景的编程方式。假设在程序中定义一个共享的用作缓存的数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成后的更新需要对后续的读服务可见。

在没有读写锁支持的时候,如果需要完成上述工作就要使用Java的等待通知机制,就是当写操作开始时,所有晚于写操作的读线程均会进入等待状态,只有写操作完成并进行通知后,所有等待的读线程才能继续执行,这样做的目的是使读操作能够读取到正确的数据,不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取写锁,写操作时获取写锁即可。当写锁被获取到时,后续的读写操作都会被阻塞,写锁释放后所有操作继续执行,相比使用等待通知机制,编程方法更加简单明了。

读写锁的接口与示例

ReadWriteLock接口仅定义了读锁和写锁的两个方法,readLock()writeLock()方法。

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

而其实现ReentrantReadWriteLock除了接口方法外,还提供了一些便于外界监控其内部工作的方法:

方法名称 描述
int getReadLockCount() 返回当前读锁被获取的次数,该次数不等于获取读锁的线程数,例如,一个线程连续获取了n次读锁,那么占据读锁的线程数是1,该方法返回n
int getReadHoldCount() 返回当前线程获取读锁的次数,在Java6后新增,它使用ThreadLocal保存当前线程获取读锁的次数
boolean isWriteLocked() 判断写锁是否被获取
int getWriteHoldCount() 返回当前写锁被获取的次数

接下来,通过一个示例说明读写锁的使用方式:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Cache {
    private static Map<String, Object> map = new HashMap<>();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    
    public static Object get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }
    
    public static Object put(String key, Object value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
    
    public static void clear() {
        writeLock.lock();
        try {
            map.clear();
        } finally {
            writeLock.unlock();
        }
    }
}

上述示例中,Cache组合一个非现场安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞。写操作put(String key, Object value)方法和clear()方法,在更新HashMap时必须提前获取写锁。

实现分析

接下来分析ReentrantReadWriteLock的实现,主要包括:读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级。

读写状态的设计

读写锁同样依赖于自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想下ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态,这使得该状态的设计成为读写锁实现的关键。

如果在一个整形变量上维护多种状态,就一定要“按位切割使用”这个变量,读写锁将变量切为两个部分,高16位表示读,低16位表示写。读写锁是如何迅速确定读和写各自的状态的呢?通过位运算。假设当前同步状态值为S,写状态等于S & 0x0000FFFF(将高16位全部抹去),读状态等于S >>> 16(无符号右移16位)。当写状态加1时,等于S+1,当读状态加1时,等于S + (1<<16),也就是S + 0x00010000

根据读写状态能得出一个推论:S不等于0时,当写状态S & 0x0000FFFF等于0时,读状态S >>> 16大于0,即读锁已被获取。

> line: 253
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    /*
     * Read vs write count extraction constants and functions.
     * Lock state is logically divided into two unsigned shorts:
     * The lower one representing the exclusive (writer) lock hold count,
     * and the upper the shared (reader) hold count.
     */

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);      //0x00010000
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;  //0x0000FFFF

    /** 返回当前被持有的共享状态的大小(读) */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** 返回当前被独占的共享状态的大小(写) */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

写锁的获取与释放

写锁是一个支持可重入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读取已经被获取或者该线程不是已经获取了写锁的线程,则当前线程进入等待状态。

> line: 382
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. 如果读状态不为0或者写状态不为0并且持有锁的线程不是当前线程,那么失败
     * 2. 如果状态已经到了上限,失败(这只会在同步状态不为0时发生)
     * 3. 否则,这个线程有资格获得锁,或者是一个重入,或许是队列规则允许。
     *    这样的话,更新同步状态并且设置持有锁的线程
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // 注意:如果 c!=0 并且 w == 0 ,那么读状态一定不为0,即存在读锁
        // 否则,判断持有锁的线程是否是当前线程
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;

        // 写锁重入
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

> line: 682
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;

    // 非公平锁情况下允许线程不排队竞争
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }

> line: 700
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;

    // 公平锁与ReentrantLock一样,需要判断前面是否有线程先请求锁
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }

该方法除了重入条件之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下获取写锁,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,其他读写线程都被阻塞。

> line: 370
/*
 * Note that tryRelease and tryAcquire can be called by
 * Conditions. So it is possible that their arguments contain
 * both read and write holds that are all released during a
 * condition wait and re-established in tryAcquire.
 */
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

写锁的释放与ReentrantLock的释放基本一致,每次释放都减少写状态,当写状态为0时表示写锁已经被释放。

读锁的获取与释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问时,读锁总会被成功获取,而所做的也只是增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。获取读锁的实现从Java 5到Java 6变得复杂很多,主要原因是新增了一些功能。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使得获取读锁的实现变得负责。因此,这里先将获取读锁的代码做了删除,保留了必要的部分:

> line: 453
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. 如果其他线程持有了写锁,失败
     * 2. 否则,这个线程有资格修改同步状态,先判断是否应该阻塞
     *    由于队列的FIFO规则(即公平锁情况)。如果不需要阻塞,
     *    尝试通过CAS更新同步状态获取锁。注意,这一步不检查
     *    重入获取,它被推迟了以避免在更加一般的非重入情况下
     *    去检查读线程自己的读状态。
     * 3. 如果步骤2失败了,因为没有资格或者CAS失败或者同步状态
     *     已经到达上限,重试
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 如果写状态不为0 并且当前线程不是持有锁的线程,失败
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);

    // 如果不需要被阻塞并且同步状态未达上限,尝试CAS更新
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        return 1;
    }

    // 如果第二步中不符合条件,循环重试
    return fullTryAcquireShared(current);
}

> line: 501
final int fullTryAcquireShared(Thread current) {
    /*
     * 获取读状态的完整版本。
     * 这里的代码和tryAcquireShared几乎相同,
     * 但是增加了CAS失败重试和可重入的相关处理部分。
     */
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // 否则我们已经持有了写锁;如果线程在这里被阻塞了,可能引发死锁
        } else if (readerShouldBlock()) {
            // 可能返回 -1
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            return 1;
        }
    }
}

> line: 700
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    // 公平锁的实现依然相同,判断前面是否有先于自己的请求
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

> line: 680
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    // 判断同步队列的第一个节点是否是独占模式
    final boolean readerShouldBlock() {
        /* 避免写线程陷入饥饿,如果同步队列中有一个等待的写
         * 线程处于头部,那么就阻塞自己。
         */
        return apparentlyFirstQueuedIsExclusive();
    }
}

相信通过这个简洁的版本我们对读锁的获取逻辑有了较为清晰的了解,现在我们将新增的ThreadLocal加入进去,分析完整的版本。我们先看一下Sync类中的剩余部分:

> line: 277
/**
 * 保存每个线程的读状态的计数器
 * 使用ThreadLocal储存,同时缓存在cachedHoldCounter中
 */
static final class HoldCounter {
    int count;          // 初始化为 0
    // 使用线程的id而不是线程引用本身,避免无用后被保留,无法被 gc
    final long tid = LockSupport.getThreadId(Thread.currentThread());
}

/**
 * ThreadLocal 子类。为反序列化机制定义
 */
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

/**
 * 当前线程持有的重入读锁数目(hold count)。只在构造函数和readObject中
 * 被初始化。当线程的hold count 变为0时被移除。
 */
private transient ThreadLocalHoldCounter readHolds;

/**
 * 最后一个成功获取读锁的线程的hold count。对于下一个释放读锁的
 * 线程是最后一个获取读锁的情况使用。不使用volatile因为它只是被
 * 用作是一个优化,并且对于缓存线程来说是不错的。
 *
 * 它可以比线程的生命周期更长, 但是通过不持有线程的引用
 * 这种方式来避免死去的线程无法被垃圾回收
 */
private transient HoldCounter cachedHoldCounter;

/**
 * firstReader 是第一个获取了读锁的线程。
 * firstReaderHoldCount 是 firstReader 的 hold count.
 *
 * 更确切的说,firstReader是最后一个将共享状态从0改为1的线程,
 * 并且在那以后还没有释放读锁;如果没有这样的线程则为null
 *
 * 不会引用垃圾存留除非线程终止时没有释放读锁,因为tryReleaseShared
 * 会将它设为null
 *
 * 对于跟踪非竞争的读锁来说,这很廉价,所以无需担心性能问题
 */
private transient Thread firstReader;
private transient int firstReaderHoldCount;

Sync() {
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // ensures visibility of readHolds
}

解释了Sync中的一些字段的作用后,我们可以分析完整的共享状态获取与释放代码了。

> line: 453
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果读状态为0,即还没有线程获取过读锁,设置firstReader
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        // 或者如果当前线程是firstReader,只需要增加firstReaderHoldCount即可
        // 从此处我们可以看到使用这种方式缓存的用处以及廉价性
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        // 如果不是firstReader,尝试使用缓存。
        // 如果当前线程是第二个获取读锁的线程(rh==null)或者缓存已经被设置了
        // (rh.tid != LockSupport.getThreadId(current)),即当前线程不是最后一个
        // 获取读锁的线程,更新缓存为当前线程。
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            // 当前线程为最后一个获取读锁的线程,同时在它释放了读锁后没有其他
            // 线程获取读锁,此时它再次获取读锁,此时缓存中保存的还是它,
            // 所以无需ThreadLocal初始化,直接使用缓存设置ThreadLocal即可
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

> line: 415
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 如果当前线程是firstReader
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        // 如果只获取了一次读锁,将其置为null,否则减1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 依然优先查询缓存,如果缓存中存储的不是当前线程,
        // 调用readHolds.get()从自己维护的ThreadLocal中获取数据。
        // 当这是最后一次释放读状态,即此次能够真正释放读锁时,
        // 将readHolds移除。否则,将读状态减1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

关于读锁的获取与释放现在已经完全分析完。

锁降级

锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是把持住写锁,再获取读锁,随后释放写锁的过程。

接下来看一个锁降级的过程。因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据准备工作。

一般形式为

r.lock();
if(!unpdate) {
    r.unlock();
    try{
        w.lock();
        //修改
        r.lock();       //核心,在不释放写锁时获取读锁,根据前面的分析可以做到此步
    } finally {
        w.unlock();
    }
}
try {
    //使用数据
} finally {
    r.unlock();
}

当数据发生变化时,update变量(volatile boolean)被设置为false,此时所有访问此方法的线程都能感知到变化,但只有一个线程能获取写锁,其他线程会阻塞在读锁和写锁的lock()方法上。

作用是提高代码运行效率,此时获取读锁其他线程则无法获取写锁,或者避免不必要的读写锁竞争,可以使读线程快速执行。

ReentrantLock不支持锁升级,目的是保证数据可见性。如果多个线程已经获取了读锁,其中有一个线程获取了写锁,那么该线程对数据的更新对其他获取了写锁的线程是不可见的。

读写锁的辅助方法

> line: 635
final int getReadLockCount() {
    return sharedCount(getState());
}

final boolean isWriteLocked() {
    return exclusiveCount(getState()) != 0;
}

final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

final int getReadHoldCount() {
    if (getReadLockCount() == 0)
        return 0;

    // 尝试从缓存中获取
    Thread current = Thread.currentThread();
    if (firstReader == current)
        return firstReaderHoldCount;

    HoldCounter rh = cachedHoldCounter;
    if (rh != null && rh.tid == LockSupport.getThreadId(current))
        return rh.count;

    // 缓存中不存在则从自己维护的ThreadLocal中获取
    int count = readHolds.get().count;
    if (count == 0) readHolds.remove();
    return count;
}

相关文章

网友评论

      本文标题:4. ReentrantReadWriteLock

      本文链接:https://www.haomeiwen.com/subject/tovtmqtx.html