美文网首页
7 ReentrantReadWriteLock

7 ReentrantReadWriteLock

作者: lijiaccy | 来源:发表于2018-06-26 22:48 被阅读0次

在并发场景中用于解决线程安全的问题,我们几乎会高频率的使用到独占式锁,通常使用java提供的关键字synchronized或者concurrents包中实现了Lock接口的ReentrantLock。它们都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁。而在一些业务场景中,大部分只是读数据,写数据很少,如果仅仅是读数据的话并不会影响数据正确性(出现脏读),而如果在这种业务场景下,依然使用独占锁的话,很显然这将是出现性能瓶颈的地方。
针对这种读多写少的情况,java还提供了另外一个实现Lock接口的ReentrantReadWriteLock(读写锁)。读写所允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。
读写锁的主要特性:

  1. 公平性选择:支持非公平性(默认)和公平的锁获取方式,吞吐量还是非公平优于公平;
  2. 重入性:支持重入,读锁获取后能再次获取,写锁获取之后能够再次获取写锁,同时也能够获取读锁;
  3. 锁降级:遵循获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁

读写锁ReentrantReadWriteLock实现接口ReadWriteLock,该接口维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。

ReadWriteLock定义了两个方法。readLock()返回用于读操作的锁,writeLock()返回用于写操作的锁。

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

再来看看ReadWriteLock的实现类ReentrantReadWriteLock

/** 内部类  读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类  写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
    this(false);
}

/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
     * 省略其余源代码
     */
}
public static class WriteLock implements Lock, java.io.Serializable{
    /**
     * 省略其余源代码
     */
}

public static class ReadLock implements Lock, java.io.Serializable {
    /**
     * 省略其余源代码
     */
}

ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,它的读锁、写锁都是依靠Sync来实现的。所以ReentrantReadWriteLock实际上只有一个锁,只是在获取读取锁和写入锁的方式上不一样而已,它的读写锁其实就是两个类:ReadLock、writeLock,这两个类都是lock实现。

在ReentrantLock中使用一个int类型的state来表示同步状态,该值表示锁被一个线程重复获取的次数。但是读写锁ReentrantReadWriteLock内部维护着两个一对锁,需要用一个变量维护多种状态。所以读写锁采用“按位切割使用”的方式来维护这个变量,将其切分为两部分,高16为表示读,低16为表示写。分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过为运算。假如当前同步状态为S,那么写状态等于 S & 0x0000FFFF(将高16位全部抹去),读状态等于S >>> 16(无符号补0右移16位)。代码如下:

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //同步状态的低16位用来表示写锁的获取次数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //同步状态的高16位用来表示读锁被获取的次数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

当读锁已经被读线程获取或者写锁已经被其他写线程获取,则写锁获取失败;否则,获取成功并支持重入,增加写状态。

写锁

写锁就是一个支持可重入的排他锁。

写锁的获取

ReentrantReadWriteLockWriteLock里面

    public void lock() {
        this.sync.acquire(1);
    }

最终也走到tryAcquire(int arg),该方法在内部类Sync中实现:

    protected final boolean tryAcquire(int var1) {
        Thread var2 = Thread.currentThread();
        int var3 = this.getState();
        int var4 = exclusiveCount(var3);
        if (var3 != 0) {
            //c != 0 && var4  == 0 表示存在读锁
            //当前线程不是已经获取写锁的线程
            if (var4 != 0 && var2 == this.getExclusiveOwnerThread()) {
                if (var4 + exclusiveCount(var1) > 65535) {
                    throw new Error("Maximum lock count exceeded");
                } else {
                    this.setState(var3 + var1);
                    return true;
                }
            } else {
                return false;
            }
        } else if (!this.writerShouldBlock() && this.compareAndSetState(var3, var3 + var1)) {
            this.setExclusiveOwnerThread(var2);
            return true;
        } else {
            return false;
        }
    }

该方法和ReentrantLock的tryAcquire(int arg)大致一样,在判断重入时增加了一项条件:读锁是否存在。因为要确保写锁的操作对读锁是可见的,如果在存在读锁的情况下允许获取写锁,那么那些已经获取读锁的其他线程可能就无法感知当前写线程的操作。因此只有等读锁完全释放后,写锁才能够被当前线程所获取,一旦写锁获取了,所有其他读、写线程均会被阻塞。

写锁的释放

获取了写锁用完了则需要释放,WriteLock提供了unlock()方法释放写锁:

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

写锁的释放最终还是会调用AQS的模板方法release(int arg)方法,该方法首先调用tryRelease(int arg)方法尝试释放锁,tryRelease(int arg)方法为读写锁内部类Sync中定义了,如下:

protected final boolean tryRelease(int releases) {
    //释放的线程不为锁的持有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //若写锁的新线程数为0,则将锁的持有者设置为null
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

写锁释放锁的整个过程和独占锁ReentrantLock相似,每次释放均是减少写状态,当写状态为0时表示 写锁已经完全释放了,从而等待的其他线程可以继续访问读写锁,获取同步状态,同时此次写线程的修改对后续的线程可见。

读锁

读锁为一个可重入的共享锁,它能够被多个线程同时持有,在没有其他写线程访问时,读锁总是或获取成功。

读锁的获取

读锁的获取可以通过ReadLock的lock()方法:

    public void lock() {
        sync.acquireShared(1);
    }

Sync的acquireShared(int arg)定义在AQS中:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcqurireShared(int arg)尝试获取读同步状态,该方法主要用于获取共享式同步状态,获取成功返回 >= 0的返回结果,否则返回 < 0 的返回结果。

protected final int tryAcquireShared(int unused) {
    //当前线程
    Thread current = Thread.currentThread();
    int c = getState();
    //exclusiveCount(c)计算写锁
    //如果存在写锁,且锁的持有者不是当前线程,直接返回-1
    //存在锁降级问题,后续阐述
    if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
        return -1;
    //读锁
    int r = sharedCount(c);

    /*
     * readerShouldBlock():读锁是否需要等待(公平锁原则)
     * r < MAX_COUNT:持有线程小于最大数(65535)
     * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
     */
    if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
        /*
         * holdCount部分后面讲解
         */
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

读锁获取过程:

  1. 因为存在锁降级情况,如果存在写锁且锁的持有者不是当前线程则直接返回失败,否则继续

  2. 依据公平性原则,判断读锁是否需要阻塞,读锁持有线程数小于最大值(65535),且设置锁状态成功,执行以下代码(对于HoldCounter下面再阐述),并返回1。如果不满足改条件,执行fullTryAcquireShared()。

    final int fullTryAcquireShared(Thread current) {
     HoldCounter rh = null;
     for (;;) {
         int c = getState();
         //锁降级
         if (exclusiveCount(c) != 0) {
             if (getExclusiveOwnerThread() != current)
                 return -1;
         }
         //读锁需要阻塞
         else if (readerShouldBlock()) {
             //列头为当前线程
             if (firstReader == current) {
             }
             //HoldCounter后面讲解
             else {
                 if (rh == null) {
                     rh = cachedHoldCounter;
                     if (rh == null || rh.tid != getThreadId(current)) {
                         rh = readHolds.get();
                         if (rh.count == 0)
                             readHolds.remove();
                     }
                 }
                 if (rh.count == 0)
                     return -1;
             }
         }
         //读锁超出最大范围
         if (sharedCount(c) == MAX_COUNT)
             throw new Error("Maximum lock count exceeded");
         //CAS设置读锁成功
         if (compareAndSetState(c, c + SHARED_UNIT)) {
             //如果是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount
             if (sharedCount(c) == 0) {
                 firstReader = current;
                 firstReaderHoldCount = 1;
             }
             //如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,则将firstReaderHoldCount+1
             else if (firstReader == current) {
                 firstReaderHoldCount++;
             } else {
                 if (rh == null)
                     rh = cachedHoldCounter;
                 if (rh == null || rh.tid != getThreadId(current))
                     rh = readHolds.get();
                 else if (rh.count == 0)
                     readHolds.set(rh);
                 //更新线程的获取“读取锁”的共享计数
                 rh.count++;
                 cachedHoldCounter = rh; // cache for release
             }
             return 1;
         }
     }
    }
    

fullTryAcquireShared(Thread current)会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

读锁的释放

与写锁相同,读锁也提供了unlock()释放读锁:

    public void unlock() {
        sync.releaseShared(1);
    }

unlcok()方法内部使用Sync的releaseShared(int arg)方法,该方法定义在AQS中:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

调用tryReleaseShared(int arg)尝试释放读锁,该方法定义在读写锁的Sync内部类中:

   protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果想要释放锁的线程为第一个获取锁的线程
    if (firstReader == current) {
        //仅获取了一次,则需要将firstReader 设置null,否则 firstReaderHoldCount - 1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    }
    //获取rh对象,并更新“当前线程获取锁的信息”
    else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //CAS更新同步状态
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

HoldCounter

在读锁获取锁和释放锁的过程中,我们一直都可以看到一个变量rh (HoldCounter ),该变量在读锁中扮演着非常重要的作用。

我们了解读锁的内在机制其实就是一个共享锁,为了更好理解HoldCounter ,我们暂且认为它不是一个锁的概率,而相当于一个计数器。一次共享锁的操作就相当于在该计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。我们先看HoldCounter的定义:

    static final class HoldCounter {
        int count = 0;
        final long tid = getThreadId(Thread.currentThread());
    }

HoldCounter 定义非常简单,就是一个计数器count 和线程 id tid 两个变量。按照这个意思我们看到HoldCounter 是需要和某给线程进行绑定了,我们知道如果要将一个对象和线程绑定仅仅有tid是不够的,而且从上面的代码我们可以看到HoldCounter 仅仅只是记录了tid,根本起不到绑定线程的作用。那么怎么实现呢?答案是ThreadLocal,定义如下:

    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

通过上面代码HoldCounter就可以与线程进行绑定了。故而,HoldCounter应该就是绑定线程上的一个计数器,而ThradLocalHoldCounter则是线程绑定的ThreadLocal。从上面我们可以看到ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们(尽管GC能够智能的发现这种引用而回收它们,但是这需要一定的代价),所以其实这样做只是为了帮助GC快速回收对象而已。

看到这里我们明白了HoldCounter作用了,我们在看一个获取读锁的代码段:

            else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }

这段代码涉及了几个变量:firstReader 、firstReaderHoldCount、cachedHoldCounter 。我们先理清楚这几个变量:

 private transient Thread firstReader = null;
 private transient int firstReaderHoldCount;
 private transient HoldCounter cachedHoldCounter;

firstReader 看名字就明白了为第一个获取读锁的线程,firstReaderHoldCount为第一个获取读锁的重入数,cachedHoldCounter为HoldCounter的缓存。

理清楚上面所有的变量了,HoldCounter也明白了,我们就来给上面那段代码标明注释,如下:

//如果获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1
else if (firstReader == current) {
    firstReaderHoldCount++;
} else {
    //非firstReader计数
    if (rh == null)
        rh = cachedHoldCounter;
    //rh == null 或者 rh.tid != current.getId(),需要获取rh
    if (rh == null || rh.tid != getThreadId(current))
        rh = readHolds.get();
        //加入到readHolds中
    else if (rh.count == 0)
        readHolds.set(rh);
    //计数+1
    rh.count++;
    cachedHoldCounter = rh; // cache for release
}

这里解释下为何要引入firstRead、firstReaderHoldCount。这是为了一个效率问题,firstReader是不会放入到readHolds中的,如果读锁仅有一个的情况下就会避免查找readHolds。

锁降级

读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级,关于锁降级下面的示例代码摘自ReentrantWriteReadLock源码中:

void processCachedData() {
    rwl.readLock().lock();
    if (!cacheValid) {
        // Must release read lock before acquiring write lock
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        try {
            // Recheck state because another thread might have
            // acquired write lock and changed state before we did.
            if (!cacheValid) {
                data = ...
        cacheValid = true;
      }
      // Downgrade by acquiring read lock before releasing write lock
      rwl.readLock().lock();
    } finally {
      rwl.writeLock().unlock(); // Unlock write, still hold read
    }
  }

  try {
    use(data);
  } finally {
    rwl.readLock().unlock();
  }
}
}

锁降级中读锁的获取释放为必要?肯定是必要的。试想,假如当前线程A不获取读锁而是直接释放了写锁,这个时候另外一个线程B获取了写锁,那么这个线程B对数据的修改是不会对当前线程A可见的。如果获取了读锁,则线程B在获取写锁过程中判断如果有读锁还没有释放则会被阻塞,只有当前线程A释放读锁后,线程B才会获取写锁成功。

读写锁的应用场景

在多线程的环境下,对同一份数据进行读写,会涉及到线程安全的问题。比如在一个线程读取数据的时候,另外一个线程在写数据,而导致前后数据的不一致性;一个线程在写数据的时候,另一个线程也在写,同样也会导致线程前后看到的数据的不一致性。
这时候可以在读写方法中加入互斥锁,任何时候只能允许一个线程的一个读或写操作,而不允许其他线程的读或写操作,这样是可以解决这样以上的问题,但是效率却大打折扣了。因为在真实的业务场景中,一份数据,读取数据的操作次数通常高于写入数据的操作,而线程与线程间的读读操作是不涉及到线程安全的问题,没有必要加入互斥锁,只要在读-写,写-写期间上锁就行了。

相关文章

网友评论

      本文标题:7 ReentrantReadWriteLock

      本文链接:https://www.haomeiwen.com/subject/pacvyftx.html