1.Lock接口
一般来说,一个锁能够防止多个线程同时访问共享资源(但有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。
Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显示地获取和释放锁。
Lock lock = new ReentrantLock();
lock.lock();
try{
} finally {
lock.unlock();
}
不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。
2.队列同步器
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架。它使用了一个int成员变量标识同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。同步器提供3个方法操作同步状态,getState()、setState(int newState)、compareAndSetState(int expect, int update)。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。
可以这样理解锁和同步器的关系:
- 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节。
- 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
锁和同步器很好地隔离了使用者和实现者所需关注的领域。
①队列同步器的接口与示例
同步器提供的模板方法基本上分为3类:
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程情况
下面通过一个独占锁的示例来深入了解一下同步器的工作原理。
独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁。
public class Mutex implements Lock {
//静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
//是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
//当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
//仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
}
Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。
②队列同步器的实现分析
接下来将从实现角度分析同步器是如何完成线程同步的。
1)同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点Node并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成节点并加入到队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update)。
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,会唤醒后继接口,而后继节点将会在获取同步状态成功时将自己设置为首节点。
2)独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代码主要逻辑:
- 首先调用自定义同步器实现的tryAcquire(int acquires)方法,该方法保证线程安全的获取同步状态。
- 如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node mode)将节点加入到同步队列的尾部。
- 最后调用acquireQueued(final Node node, int arg)使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试在尾部添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
为什么只有头节点才能尝试获取同步状态:
- 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
- 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如下图。
acquire(int arg)方法调用流程:
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。
总结:在获取同步状态时,同步器维护一个同步队列,获取失败的线程都会被加入到队列中并在队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
3)共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。
通过调用同步器的acquireShared(int arg)方法可以共享地获取同步状态。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
同步器调用tryAcquireShared(int arg)尝试获取同步状态,tryAcquireShared(int arg)返回大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)返回值大于等于0。可以看到,在doAcquireShared(int arg)的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。
与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg)必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,以为释放同步状态的操作会同时来自多个线程。
4)超时获取同步状态
acquireInterruptibly(int arg)获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException
超时获取同步状态,调用同步器的doAcquireNanos(int arg, long nanosTimeout),它是上述方法的增强版。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//当已到设置的超时时间,该线程会从这里返回
LockSupport.parkNanos(this, nanosTimeout);
//当nanosTimeout <= spinForTimeoutThreshold时,不会使该线程超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确。
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5)自定义同步组件——TwinsLock
public class TwinsLock implements Lock {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
//其他接口方法略
}
public class TwinsLockTest {
Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
public void test() {
//启动10个线程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
//每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
public static void main(String[] args) {
new TwinsLockTest().test();
}
}
3.重入锁
重入锁ReentrantLock,顾名思义,就是支持冲重进入的锁。自定义的Mutex,占有锁的线程再次调用tryAcquire方法时返回false,导致该线程被阻塞。所以Mutex是一个不支持重新进入的锁。
synchronized关键字隐式支持重进入,比如一个synchronized修饰的递归方法。
公平锁:等待时间最长的线程最优先获取锁。反之则是不公平锁。
①实现重进入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞。
实现该特性需要解决一下两个问题:
1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。
ReentrantLock通过组合自定义同步器Sync(继承了AbstractQueuedSynchronizer)来实现锁的获取与释放,以非公平性(默认的)实现为例,获取同步状态的代码如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值,该方法的代码如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
②公平与非公平获取锁的区别
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
公平锁获取同步状态的方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
该方法与nonfairTryAcquire比较,唯一不同的位置为判断条件多了hasQueuedPredecessors方法,即加入了同步队列中当前节点是否有前驱节点的判断。
下面编写一个测试来观察公平和非公平锁在获取锁时的区别:
public class FairAndUnfairTest {
private static Lock fairLock = new ReentrantLock2(true);
private static Lock unFairLock = new ReentrantLock2(false);
@Test
public void fair() {
testLock(fairLock);
}
@Test
public void unfair() {
testLock(unFairLock);
}
private void testLock(Lock lock) {
//启动5个Job
for (int i = 0; i < 5; i++) {
Job job = new Job(lock);
job.setName(String.valueOf(i));
job.start();
}
}
private static class Job extends Thread {
private Lock lock;
Job(Lock lock) {
this.lock = lock;
}
public void run() {
//连续2次打印当前的Thread和等待队列中的Thread
for (int i = 0; i < 2; i++) {
lock.lock();
try {
ReentrantLock2 l = (ReentrantLock2) this.lock;
Collection<Thread> queuedThreads = l.getQueuedThreads();
String collect = queuedThreads.stream().map(Thread::getName).collect(Collectors.joining(","));
System.out.println("Lock by[" + Thread.currentThread().getName() + "].Waiting by [" + collect + "]");
} finally {
lock.unlock();
}
}
}
}
private static class ReentrantLock2 extends ReentrantLock {
ReentrantLock2(boolean fair) {
super(fair);
}
public Collection<Thread> getQueuedThreads() {
ArrayList<Thread> threads = new ArrayList<>(super.getQueuedThreads());
Collections.reverse(threads);
return threads;
}
}
}
公平性锁每次都是从同步队列中的第一个节点获取到锁,非公平性锁出现了一个线程连续获取锁的情况。
为什么会出现线程连续获取锁的情况呢?nonfairTryAcquire方法,当一个线程请求锁时,只要获取了同步状态即成功获取锁。在这个前提下,刚释放的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。
为什么非公平性锁被设定成默认实现?上表结果,公平性锁在测试中进行了10次切换,而非公平性锁只有5次切换,这说明非公平性锁的开销更小。
测试:10个线程,每个线程获取100000次锁,通过vmstat统计测试运行时系统线程上下文切换的次数,结果如下:
公平性锁保证了锁的获取按照FIFO原则,代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。
4.读写锁
读写锁维护了一对锁,一个读锁和一个写锁。
当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock。
①读写锁的接口与示例
示例:
public class Cache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
//获取一个key对应的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
//设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
//清空所有的内存
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同事使用读写锁的读锁和写锁来保证Cache是线程安全的。
②读写锁的实现分析
1)读写状态的设计
读写锁的自定义同步器(继承AQS)需要在同步状态(一个整型变量)上维护多个线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
上图同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。
读写锁通过位运算迅速确定读和写各自的状态。假设当前同步状态为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。
2)写锁的获取与释放
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的释放与ReentrantLock的释放过程基本类似,每次释放减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁。
3)读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取。
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//写锁已经被获取,且获取的线程不是该线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//不需要被放到阻塞队列、已经存在的读锁小于最大值、增加读状态成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//记录线程获取读锁的次数
if (r == 0) {//之前没有线程获取过读锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//第一个获取读锁的是该线程
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
//获取读锁的完整版本,用于处理tryAcquireShared中CAS失败的、重入读锁在tryAcquireShared中未处理的
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 否则,我们持有独占锁,在这里阻塞会导致死锁
} else if (readerShouldBlock()) {
// 确保我们不会重新获得读锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少状态,减少的值是(1<<16)。
4)锁降级
锁降级指的是写锁降级为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
示例:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
//在获取写锁之前必须释放读锁
rwl.readLock().unlock();
//锁降级从写锁获取到开始
rwl.writeLock().lock();
try {
//重新检查状态,因为另一个线程可能在我们之前已经获取了写锁和更改了状态
if (!cacheValid) {
//准备数据的流程(略)
data = ...
cacheValid = true;
}
// 在释放写锁之前通过获取读锁来降级
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); //释放写锁,仍然保持读锁
}
}
try {
//使用数据的流程(略)
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
}
5.LockSupport工具
队列同步器里,当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。
LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。
Java 6中,增加了park(Object blocker)、parkNanos(Object blocker, long nanos)、parkUntil(Object blocker, long deadline),用于实现阻塞当前线程的功能,其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。
6.Condition接口
任意一个Java对象,都拥有一组监视器方法(定义在Object上),主要包括wait()、wait(long timeout)、notify()、notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。
Condition接口提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。
①Condition接口与示例
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition是依赖Lock对象的,调用Lock对象的newCondition方法创建。
public class ConditionUseCase {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
示例:
public class BoundedQueue<T> {
private Object[] items;
//添加的下标,删除的下标和数组单签数量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
//添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位”
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) //数组已满
notFull.await(); //释放锁并进入等待状态。 收到通知之后获取锁并返回
items[addIndex] = t;//添加元素到数组中
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal();//通知等待在notEmpty上的线程,数组中已经有新元素可以获取。
} finally {
lock.unlock();
}
}
//由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) //使用while而不用if,目的是防止过早或意外的通知,只有条件符合才能退出循环。
notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
②Condition的实现分析
ConditionObject是同步器AbstractQueuedSynchronizer的内部类。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。下面提到的Condition不加说明都指的是ConditionObject。
1)等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程。节点的定义复用了同步器中节点的定义(AbstractQueuedSynchronizer.Node)。
调用Condition.await()方法,那么该线程将会释放锁、构造成节点胶乳等待队列并进入等待状态。
节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切的说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下:
2)等待
调用Condition.await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//isOnSyncQueue:节点已经在同步队列为true,否则为false
LockSupport.park(this);//阻塞线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //acquireQueued:加入到获取同步状态的竞争
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
该方法将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步状态队列中的后继节点,然后当前线程会进入等待状态。
如上图所示,同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter方法把当前线程构造成一个新的节点并将其加入等待队列中。
3)通知
调用Condition的signal方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。
public final void signal() {
if (!isHeldExclusively()) //isHeldExclusively:当前线程获取了锁,返回true
throw new IllegalMonitorStateException();
Node first = firstWaiter;//获取等待队列的首节点
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); //将首节点移动到同步队列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);//唤醒节点中的线程
return true;
}
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
网友评论