前言
对源码
AQS(AbstractQueuedSynchronizer)
(在java.util.concurrent.locks
)的分析会是一个系列文章.这篇文章主要是以例子的形式并且一步步把源码中的代码加入我们自己的代码中,从而一步步完成对AQS
的源码分析.
完整代码:代码
添加必要的接口和抽象类
Lock
public interface Lock {
// 获取锁
void lock();
// 获取锁 响应中断
void lockInterruptibly() throws InterruptedException;
// 获取锁 如果锁是available立即返回true, 如果锁不存在就立即返回false
boolean tryLock();
// 在上面的基础上加一个时间限制
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 后续博客会讨论
Condition newCondition();
}
定义了锁的一些常规操作
AbstractOwnableSynchronizer
定义了一个独占锁当前所对应的线程并且提供了
set
和get
方法
public class AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer(){}
//独占锁对应的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
this.exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return this.exclusiveOwnerThread;
}
}
实现AbstractOwnableSynchronizer(以下以AQS简称)
在介绍主体方法之前先介绍一个内部类
Node
,AQS中等待队列的元素就是Node
类的一个个对象.
Node
static final class Node {
// 共享
static final Node SHARED = new Node();
// 表示独占
static final Node EXCLUSIVE = null;
// 表明这个节点已经被取消
static final int CANCELLED = 1;
// 表明需要唤醒下一个等待的线程
static final int SIGNAL = -1;
// 后续会讨论
static final int CONDITION = -2;
// 后续会讨论
static final int PROPAGATE = -3;
// 当前节点的等待状态
volatile int waitStatus;
// 当前节点的前一个节点
volatile Node prev;
// 当前节点的下一个节点
volatile Node next;
// 当前节点所表示的线程
volatile Thread thread;
// 用于判断是否是独占还是共享
Node nextWaiter;
//判断是独占还是共享
final boolean isShared() {
return nextWaiter == SHARED;
}
// 取前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 无参构造函数
Node() {}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
public String toString() {
return "[" + (thread == null ? "NULL" : thread.getName()) + "," + waitStatus + "]";
}
}
Node
类是一个封装了thread
并且带有一些状态的节点
1. 所以它有个thread
属性表示该节点所对应的线程.
2. 还有属性nextWaiter
来表示该节点是共享锁SHARED
的还是独占锁EXCLUSIVE
的
3.toString()
函数是为了方便打印我自己加上的
4. 有一个waitStatus
来表示当前节点的等待状态,有(CANCELLED
,SIGNAL
,CONDITION
,PROPAGATE
)
CANCELLED
: 表示该节点已经被取消了
SIGNAL
: 表示该节点在release
或者取消时需要去唤醒下一个线程(这里留个疑问?还有在什么情况下会去唤醒下一个节点的线程)
后面的两个属性会在后面的系列博客中有分析到
其实还有一个属性是0
在生成节点的时候waitStatus
默认是0
AQS的属性
public class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
static final class Node {}
// 同步器等待队列的头节点
private transient volatile Node head;
// 同步器等待队列的尾节点
private transient volatile Node tail;
// 同步器的状态值
private volatile int state;
// 获取同步器状态值
protected final int getState() {
return state;
}
// 设置同步器的状态值
protected final void setState(int newState) {
state = newState;
}
public final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private static final Unsafe unsafe;
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe)f.get(null);
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
先说几个属性
1.head
和tail
表示等待队列的头尾节点
2.state
表示AQS的状态值,在后面的例子中可以更清晰的理解它的作用. 并且提供了set
和get
方法.
然后说一下跟源码的区别
源码:private static final Unsafe unsafe = Unsafe.getUnsafe();
由于在我们的代码使用这个语句会报错,因为在类加载的时候必须要使用对应的类加载器,然而我们可以使用反射的方式拿到该对象.
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe)f.get(null);
Unsafe
的相关说明会写一个博客来另外分析,现在只需要知道它在多线程情况下是可以安全实现那个对应的操作就可以了
接下来我们使用一个例子来看看如何一步步实现一个互斥锁的.
例子Mutex互斥锁
既然我们是要实现一个锁,那我们可以继承
Lock
接口,然后我们使用AQS的一个子类的一个对象来真正实现Lock
的功能.
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
public class Mutex implements Lock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean isHeldExclusively() {
return getState() == 1;
}
public boolean tryAcquire(int acquires) {
if (super.compareAndSetState(0, 1)) {
super.setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
public void printWaitingNode() {
sync.printQueue();
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public void unlock() {
// TODO Auto-generated method stub
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}1
从上面代码中可以看到,AQS中的状态是0时表明锁是空闲的,1表明锁已经被某个线程独占了,表明这个线程已经拥有了这个锁,那根据我们的猜想,其余没有获得锁的线程放到哪里呢?没错,就是放到等待队列中,那接下来我们先看看用一个测试类看看效果.
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
Mutex m = new Mutex();
for (int i = 0; i < 5; i++) {
new Thread(new Runner(m), "thread-" + i).start();;
}
for (int i = 0; i < 5; i++) {
m.printWaitingNode();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Runner implements Runnable {
Mutex m;
public Runner(Mutex m) {
this.m = m;
}
@Override
public void run() {
m.lock();
System.out.println(Thread.currentThread().getName() + " runs");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
m.unlock();
}
}
}
运行结果如下:
表明thread-0
获得了锁,其余的线程在等待队列中.
thread-0 runs
[NULL,-1]->[thread-1,-1]->[thread-2,-1]->[thread-3,-1]->[thread-4,0]->
那接下来看看如何一步步实现
AQS
中的acquire
方法的.建议看下面分析是可以带着Mutex
的代码来看下面的分析
acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
由于调用了
tryAcquire
和acquireQueued
方法,我们先看看这两个方法再回头来分析这个方法
tryAcquire方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
作用: 获得了锁返回true,没有获得返回false
这个方法是为子类准备的,所以在Mutex
也实现了tryAcquire
方法,所以第一个线程thread-0
获得锁的调用过程中,tryAcquire
已经返回true
,所以根本不会去执行acquireQueued
方法和selfInterrupt
所以当该线程没有获得锁的时候就要执行
acquireQueued
方法,可以大胆预测该方法是想让这个线程加入到等待队列中并且让它阻塞(其实仔细一想,获得了锁的线程其实是程序没有去阻塞它而已嘛,让它继续执行罢了,那没有获得锁的线程就得阻塞它,让它没有办法继续执行)
在看
acquireQueued
方法前,先看看addWaiter
方法
addWaiter(Node.EXCLUSIVE)
注意添加的参数是独占型的节点
//添加一个节点到当前队列 在尾部添加
private Node addWaiter(Node mode) {
// 生成一个mode类型的节点 节点的thread是当前线程
Node node = new Node(Thread.currentThread(), mode);
/**
* 如果等待队列尾部不为空,则直接通过unsafe操作放到队列尾部
* 如果不是则调用 enq方法把节点插入到等待队列中.
*/
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
作用:将当前thread包装成节点,独占型,waitStatus=0
流程分四步:
1. 生成node
节点[独占型,waitStatus=0,thread=Thread.currentThread()]
2. 如果等待队列尾部存在,表明链表已经初始化过了,可以直接放到链表尾部
3. 如果链表没有初始化,那就是让enq(node)
去初始化链表并且把node
加入到链表尾部
4. 返回这个node
接下来看看
enq
方法
enq(final Node node)方法
参数node是即将要放入到队列尾部的节点
// 将node节点放到等待队列的尾部
private Node enq(final Node node) {
/**
* 无限循环到条件满足后退出
*/
for (;;) {
Node t = tail;
/**
* 如果队列尾部是空 表明队列还没有进行过初始化
* 如果不为空 则把节点链接到队列的尾部
*/
if (t == null) {
//生成一个thread为null的节点并且设置为头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//利用unsafe的操作把当前节点设置到尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
作用:将node节点放到等待队列的尾部
流程:通过for循环自旋的方式一直检查尾部,如果尾部为空,则创建一个thread为null
的头节点,然后将其设置为尾节点,如果不为空,则把该节点加入到队列尾部.
问题:至于为什么需要用for循环自旋的方式呢?1. 当链表是空的时候,只有一个线程进来的时候会先进入
if(t==null)
块中,完成后会再次进入for循环中的else
块中并返回.
2. 当有两个线程同时进入else
块中的时候,只能有一个线程会设置成功,所以失败的那个线程会进入for循环后再次进入else
块中.
到这里我们先简单总结一下:
1. 该线程由于在
tryAcquire
方法中返回false
表明没有获得锁,因此需要被阻塞
2. 我们已经通过addWaiter
方法把该线程包装成一个独占型的节点放入到等待队列的尾部并且返回了该节点.
那按照思路接下来的操作就应该是如何来阻塞这个线程了,那接下来我们来看看这里最核心的
acquireQueued
方法
acquireQueued(final Node node, int arg)方法
/**
*
* @param node 当前节点
* @param arg acquire请求的状态参数
* @return 返回是否需要对当前运行线程进行中断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里有几个新的方法,我先简单介绍一下每个方法的作用,具体的细节分析会放到后面的分析中.
1.predecessor()
取得当前节点node的前驱节点 属于Node
中的成员方法
2.setHead(node)
把当前节点node设置为头节点
3.shouldParkAfterFailedAcquire(p, node)
返回当前节点node是否可以进行休眠
4.parkAndCheckInterrupt()
让当前线程进行休眠状态并且在唤醒或者中断后返回中断状态
5.cancelAcquire(node)
取消当前节点node
关于两个属性也简单介绍一下
1.failed
表示获得锁是否成功
2.interrupted
表示当前节点对应的线程是否有被中断过
作用:利用for循环自旋的方式让等待获取锁的线程都进行休眠,因为休眠可以节约cpu资源,让拥有了锁的线程可以充分利用cpu运行
流程如下:
所以我们接下来就看看上面那三个方法的具体实现
setHead(node)方法
/**
* 作用: 把node节点设置为等待队列的头节点
*/
private void setHead(Node node) {
/**
* 1. 把当前节点设置为头节点
* 2. 把当前节点的thread设置为null,因为这个node.thread已经获得了锁
* 3. 把当前节点的前链接设置为null
*/
head = node;
node.thread = null;
node.prev = null;
}
作用: 把节点设置为等待队列的头节点
注意:waitStatus
的值没有修改
shouldParkAfterFailedAcquire(Node pred, Node node)方法
/**
* 这个方法必须要保证pred == node.prev
* 作用: 此方法是在判断node节点是否可以进行休眠状态, 因为进行休眠状态可以节约资源利用(cpu)
* 所以需要让node休眠前要确保有节点可以唤醒它,因此下面所做的事情就是检查等待队列中
* 前面的节点是否满足条件.
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/**
* 如果节点pred的状态是Node.SIGNAL的时候,表明pred在release的时候
* 会唤醒下一个节点,也就是node节点
* 返回true 表明node节点可以休眠
*/
return true;
if (ws > 0) {
/**
* 表明节点pred所对应的线程已经被取消了,因此需要一直往前找
* 直到前一个节点的waitStatus小于等于0,中间的那些取消的节点
* 会被删除 因为最后pred.next = node.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 所以这个分支表示当前节点的等待状态要么是0或者PROPAGATE,
* 此时直接把它设置为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
作用:是判断
node
节点是否可以进行休眠状态,但是休眠前需要保证有节点可以唤醒它,这个时候Node.SINGAL
就是这个作用.
流程:
juc_3.png
ws>0块所做的内容如下:
juc_2(1).png
[疑问]:多个线程进入到这里的时候会不会有问题?欢迎大家一起讨论
看完上面两个图然后再结合着
acquireQueued方法
看一下可以知道线程在休眠前必须要保证有前驱节点可以唤醒它,而这个标志就是使得前驱节点的waitStatus
要为Node.SIGNAL
请注意:比如pred的waitStatus不是1或者-1的时候,当第一次调用该方法时会返回false,第二次调用会直接返回true,因为返回到
acquireQueued
中的for
循环中还是有可能会继续调用该方法.(waitStatus是1的时候大家可以自行判断一下)
那到这里我们已经知道当线程没有取得锁的时候,已经有个方法会告诉当前线程是否可以进行休眠,并且在没有获得锁的情况下尽量会让该线程休眠,所以接下来我们看看如何让当前线程休眠的
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
作用: 让当前线程进行休眠状态并且在唤醒或者中断后返回中断状态
注意: 第一句就会让当前线程进行休眠状态,只有等到当前线程被唤醒后才可以进入第二句话
线程被唤醒有两种情况:
1. 被前面的(SIGNAL)节点唤醒 此时第二句话返回false
2. 线程被中断 此时第二句话返回true (关于中断的话题会有另外专门博客介绍)
其实到这里我们就可以看明白我们前面那个例子最后打印出来的等待队列是如何产生的了.
阶段性总结
分析到这里我们就可以回头看看
juc_5(1).pngacquire(int arg)
的流程图了
另外对上面的例子所表现出来的结果如下:
juc_4.png
解释一下:
1. thread-0没有出现: 因为thread-0
已经获得了锁在tryAcquire
中就已经返回了,可以看流程图
2. 头节点的等待状态是-1: 这是因为在链表加入thread-1
那个节点的时候会通过shouldParkAfterFailedAcquire
将刚刚初始化的0
状态改变成Node.SIGNAL
状态.
3. 尾节点thread-4的等待状态是0: 因为它就是初始化的状态,还没有发生过改变.
接下来就是剩下的
cancelAcquire
cancelAcquire(Node node)
/**
* 添加cancelAcquire方法
* 作用: 取消该节点
*/
private void cancelAcquire(Node node) {
// 如果节点为null 直接返回
if (node == null)
return;
// 设置节点所对应的thread为null
node.thread = null;
// 找到node的前驱节点(必须是没有被取消的节点)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 前驱节点的next节点
Node predNext = pred.next;
// 设置当前节点为取消状态
node.waitStatus = Node.CANCELLED;
/**
* 如果当前节点是尾节点: (表明当前节点可以直接取消,不用管后面的节点,因为后面没有节点了)
* 1. 设置前驱节点pred成为新的尾节点
* 2. 在1成功的条件下设置尾节点的next为null
*/
if (node == tail && compareAndSetTail(node, pred)) {
//情况3
compareAndSetNext(pred, predNext, null);
} else {
/**
* 表明node后面还有节点,因此后面的节点会需要唤醒
* 有两种情况:
* 1. 前驱节点不是头节点,并且前驱节点的等待状态是SINGAL或者可以成为SINGAL状态,
* 并且前驱节点的thread不能为null. 这种情况下不需要去唤醒node后面的线程,因为pred节点可以去唤醒的
* 2. 如果不符合1,就需要主动去唤醒node后面的节点线程了,因为此时不唤醒,node后面节点的线程就没办法再唤醒了
*/
int ws;
//情况1
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//情况2
unparkSuccessor(node);
}
//设置node节点自旋
node.next = node; // help GC
}
}
当如果有异常的时候会进入该方法中取消该节点
以下列出了三种情况分别出现的情景:
juc_4(3).png
注意:最后将该节点设置为自旋方便GC操作
直接看```unparkSuccessor(node)方法
unparkSuccessor(Node node)
/**
* 唤醒node的后驱节点(node后面第一个没有被取消的节点)
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 取得node的后继节点
*
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
作用:唤醒node的后驱节点(node后面第一个没有被取消的节点)
注意: 在寻找node的后继节点的时候,为什么需要从后往前寻找,情况上图中的情况2
这里有两点疑问我还没有弄清楚,欢迎大家一起讨论
1. 开始的部分为什么需要把ws设置为0,所以我把英文直接贴上来了
接下来再看一个
selfInterrupt()
方法
selfInterrupt()方法
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
让当前线程中断,线程中断的话题会写一个专门的博客来分析的
打印等待队列的代码
public void printQueue() {
printQueueNode(tail);
System.out.println();
}
public void printQueueNode(Node node) {
if (node == null) return;
printQueueNode(node.prev);
System.out.print(node + "->");
}
这两个方法是我自己为了方便打印加的,因为需要从链表的尾部遍历,因此我就用了个递归的写法.
至此关于获取锁这部分的代码就已经分析完了,你可以把代码下载下来自己运行着看一下.
最后加一个锁的释放
锁的释放 release(int arg)方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
代码和逻辑都比较简单,就是当head不为null并且状态值不为0的时候,会去唤醒下一个头结点后的后驱节点.
请注意唤醒的线程会从parkAndCheckInterrupt中返回(此时返回值是false)到acquireQueued方法中去尝试获取锁
所以接下来我们完善一下Mutex类和测试用例
完善Mutex类和测试用例
在Mutex类中unlock修改成
@Override
public void unlock() {
// TODO Auto-generated method stub
sync.release(1);
}
在Sync类中添加tryRelease方法
public boolean tryRelease(int releases) {
if (super.getState() == 0)
throw new IllegalMonitorStateException();
super.setExclusiveOwnerThread(null);
super.setState(0);
return true;
}
再次运行例子,在我的电脑上结果如下:
thread-0 runs
[NULL,-1]->[thread-1,-1]->[thread-2,-1]->[thread-3,-1]->
[NULL,-1]->[thread-1,-1]->[thread-2,-1]->[thread-3,-1]->[thread-4,0]->
thread-1 runs
[NULL,-1]->[thread-2,-1]->[thread-3,-1]->[thread-4,0]->
thread-2 runs
[NULL,-1]->[thread-3,-1]->[thread-4,0]->
thread-3 runs
[NULL,-1]->[thread-4,0]->
thread-4 runs
[疑问]: 为什么会出现
[NULL,-1]->[thread-1,-1]->[thread-2,-1]->[thread-3,-1]->
结尾
至此对独占锁的分析差不多就到这里了,欢迎大家对上面存在的疑问一起讨论哈,或者哪里写得有问题也欢迎提出一起讨论哈.
后面的文章将会继续接着本文继续讨论,对独占锁的中断获取等等,就是一步步把Mutex没有实现的方法实现完.
完整代码:代码
另外如果有帮助到您一点点,请帮忙给个赞吧().谢谢观看.
参考
1. Java并发编程的艺术
2. Java1.8 java.util.concurrent.locks包的源代码
网友评论