本文为他人博客,转自:详解 AbstractQueuedSynchronizer 由于某种神秘力量,很多人不能访问,故直接搬运
前言
队列同步器 AbstractQueuedSynchronizer(以下简称 AQS),是用来构建锁或者其他同步组件的基础框架。它使用一个 int 成员变量来表示同步状态,通过 CAS 操作对同步状态进行修改,确保状态的改变是安全的。通过内置的 FIFO (First In First Out)队列来完成资源获取线程的排队工作。更多关于 Java 多线程的文章可以转到 这里
AQS 和 synchronized
在介绍 AQS 的使用之前,需要首先说明一点,AQS 同步和 synchronized 关键字同步(以下简称 synchronized 同步)是采用的两种不同的机制。首先看下 synchronized 同步,synchronized 关键字经过编译之后,会在同步块的前后分别形成 monitorenter 和 monitorexit 这两个字节码指令,这两个字节码需要关联到一个监视对象,当线程执行 monitorenter 指令时,需要首先获得获得监视对象的锁,这里监视对象锁就是进入同步块的凭证,只有获得了凭证才可以进入同步块,当线程离开同步块时,会执行 monitorexit 指令,释放对象锁。
在 AQS 同步中,使用一个 int 类型的变量 state 来表示当前同步块的状态。以独占式同步(一次只能有一个线程进入同步块)为例,state 的有效值有两个 0 和 1,其中 0 表示当前同步块中没有线程,1 表示同步块中已经有线程在执行。当线程要进入同步块时,需要首先判断 state 的值是否为 0,假设为 0,会尝试将 state 修改为 1,只有修改成功了之后,线程才可以进入同步块。注意上面提到的两个条件:
- state 为 0,证明当前同步块中没有线程在执行,所以当前线程可以尝试获得进入同步块的凭证,而这里的凭证就是是否成功将 state 修改为 1(在 synchronized 同步中,我们说的凭证是对象锁,但是对象锁的最终实现是否和这种方式类似,没有找到相关的资料)
- 成功将 state 修改为 1,通过使用 CAS 操作,我们可以确保即便有多个线程同时修改 state,也只有一个线程会修改成功。关于 CAS 的具体解释会在后面提到。
当线程离开同步块时,会修改 state 的值,将其设为 0,并唤醒等待的线程。所以在 AQS 同步中,我们说线程获得了锁,实际上是指线程成功修改了状态变量 state,而线程释放了锁,是指线程将状态变量置为了可修改的状态(在独占式同步中就是置为了 0),让其他线程可以再次尝试修改状态变量。在下面的表述中,我们说线程获得和释放了锁,就是上述含义, 这与 synchronized 同步中说的获得和释放锁的含义不同,需要区别理解。
基本使用
本节摘自 Java 并发编程的艺术
AQS 的设计是基于模板方法的,使用者需要继承 AQS 并重写指定的方法。在后续的流程中,AQS 提供的模板方法会调用重写的方法。一般来说,我们需要重写的方法主要有下面 5 个:
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int) | 独占式获取锁,实现该方法需要查询当前状态并判断同步状态是否和预期值相同,然后使用 CAS 操作设置同步状态 |
protected boolean tryRelease(int) | 独占式释放锁,实际也是修改同步变量 |
protected int tryAcquireShared(int) | 共享式获取锁,返回大于等于 0 的值,表示获取锁成功,反之获取失败 |
protected boolean tryReleaseShared(int) | 共享式释放锁 |
protected boolean isHeldExclusively() | 判断调用该方法的线程是否持有互斥锁 |
在自定义的同步组件中,我们一般会调用 AQS 提供的模板方法。AQS 提供的模板方法基本上分为 3 类: 独占式获取与释放锁、共享式获取与释放锁以及查询同步队列中的等待线程情况。下面是相关的模板方法:
方法名称 | 描述 |
---|---|
void acquire(int) | 独占式获取锁,如果当前线程成功获取锁,那么方法就返回,否则会将当前线程放入同步队列等待。该方法会调用重写的 tryAcquire(int arg) 方法判断是否可以获得锁 |
void acquireInterruptibly(int) | 和 acquire(int) 相同,但是该方法响应中断,当线程在同步队列中等待时,如果线程被中断,会抛出 InterruptedException 异常并返回。 |
boolean tryAcquireNanos(int, long) | 在 acquireInterruptibly(int) 基础上添加了超时控制,同时支持中断和超时,当在指定时间内没有获得锁时,会返回 false,获取到了返回 true |
void acquireShared(int) | 共享式获得锁,如果成功获得锁就返回,否则将当前线程放入同步队列等待,与独占式获取锁的不同是,同一时刻可以有多个线程获得共享锁,该方法调用 tryAcquireShared(int) |
acquireSharedInterruptibly(int) | 与 acquireShared(int) 相同,该方法响应中断 |
tryAcquireSharedNanos(int, long) | 在 acquireSharedInterruptibly(int) 基础上添加了超时控制 |
boolean release(int) | 独占式释放锁,该方法会在释放锁后,将同步队列中第一个等待节点唤醒 |
boolean releaseShared(int) | 共享式释放锁 |
Collection<thread> getQueuedThreads()</thread> | 获得同步队列中等待的线程集合 |
自定义组件通过使用同步器提供的模板方法来实现自己的同步语义。下面我们通过两个示例,看下如何借助于 AQS 来实现锁的同步语义。我们首先实现一个独占锁(排它锁),独占锁就是说在某个时刻内,只能有一个线程持有独占锁,只有持有锁的线程释放了独占锁,其他线程才可以获取独占锁。下面是具体实现:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Created by Jikai Zhang on 2017/4/9.
* <p>
* 自定义共享锁
*/
public class TwinsLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
public Sync(int resourceCount) {
if (resourceCount <= 0) {
throw new IllegalArgumentException("resourceCount must be larger than zero.");
}
// 设置可以共享的资源总数
setState(resourceCount);
}
@Override
protected int tryAcquireShared(int reduceCount) {
// 使用尝试获得资源,如果成功修改了状态变量(获得了资源)
// 或者资源的总量小于 0(没有资源了),则返回。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount - reduceCount;
if (newCount < 0 || compareAndSetState(lastCount, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int returnCount) {
// 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount + returnCount;
if (compareAndSetState(lastCount, newCount)) {
return true;
}
}
}
}
// 定义两个共享资源,说明同一时间内可以有两个线程同时运行
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public static void main(String[] args) {
final Lock lock = new TwinsLock();
int threadCounts = 10;
Thread threads[] = new Thread[threadCounts];
for (int i = 0; i < threadCounts; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
for (int i = 0; i < threadCounts; i++) {
threads[i].start();
}
}
}
程序的运行结果如下面所示。我们看到使用了 Mutex 之后,线程 0 和线程 1 不会再交替执行,而是当一个线程执行完,另外一个线程再执行。
Without mutex:
Thread-0: j =0
Thread-1: j =0
Thread-0: j =20000
Thread-1: j =20000
Thread-0: j =40000
Thread-1: j =40000
Thread-0: j =60000
Thread-1: j =60000
Thread-1: j =80000
Thread-0: j =80000
With mutex:
Thread-0: j =0
Thread-0: j =20000
Thread-0: j =40000
Thread-0: j =60000
Thread-0: j =80000
Thread-1: j =0
Thread-1: j =20000
Thread-1: j =40000
Thread-1: j =60000
Thread-1: j =80000
下面在看一个共享锁的示例。在该示例中,我们定义两个共享资源,即同一时间内允许两个线程同时执行。我们将同步变量的初始状态 state 设为 2,当一个线程获取了共享锁之后,将 state 减 1,线程释放了共享锁后,将 state 加 1。状态的合法范围是 0、1 和 2,其中 0 表示已经资源已经用光了,此时线程再要获得共享锁就需要进入同步序列等待。下面是具体实现:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Created by Jikai Zhang on 2017/4/9.
* <p>
* 自定义共享锁
*/
public class TwinsLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
public Sync(int resourceCount) {
if (resourceCount <= 0) {
throw new IllegalArgumentException("resourceCount must be larger than zero.");
}
// 设置可以共享的资源总数
setState(resourceCount);
}
@Override
protected int tryAcquireShared(int reduceCount) {
// 使用尝试获得资源,如果成功修改了状态变量(获得了资源)
// 或者资源的总量小于 0(没有资源了),则返回。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount - reduceCount;
if (newCount < 0 || compareAndSetState(lastCount, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int returnCount) {
// 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount + returnCount;
if (compareAndSetState(lastCount, newCount)) {
return true;
}
}
}
}
// 定义两个共享资源,说明同一时间内可以有两个线程同时运行
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public static void main(String[] args) {
final Lock lock = new TwinsLock();
int threadCounts = 10;
Thread threads[] = new Thread[threadCounts];
for (int i = 0; i < threadCounts; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
for (int i = 0; i < threadCounts; i++) {
threads[i].start();
}
}
}
运行程序,我们会发现程序每次都会同时打印两条语句,如下面的形式,证明同时有两个线程在执行。
Thread-0
Thread-1
Thread-3
Thread-2
Thread-8
Thread-4
Thread-3
Thread-6
网友评论