相信大家看过AQS的代码的话大家对它应该会有基本的认识。它保存了两个Node,head跟tail。先看Node,Node是个双向链表,存储了pre跟next,两个有参构造方法存储Thread。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
//前任
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
如果硬看AQS的话就很晦涩了,我们可以先写一个最简单的AQS的例子,允许线程获取锁,释放锁。
public class Mutex implements Lock {
private Sync sync = new Sync();
class Sync extends AbstractQueuedSynchronizer {
public boolean tryAcquire(int acquire) {
assert acquire == 1;
if (compareAndSetState(0, 1)) {//如果是0 -> 变成1
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int release) {
assert release == 1;
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition getNewCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
public int getQueueLength() {
return sync.getQueueLength();
}
@NotNull
@Override
public Condition newCondition() {
return sync.getNewCondition();
}
}
我们用这个自定义的锁来对AQS初窥门径,Mutex就相当于synchronized包裹下的notify跟wait,AQS其实看来就相当于object 的锁池。可以看出来Mutex这个类的所有操作其实都是用Sync来做的操作。相当于是AQS的代理类。
直接看lock
sync.acquire(1);
acquire Sync并没有重写acquire,我们看AQS的acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们看Sync实现了tryAcquire,看上面Mutex的代码 CAS 尝试获取锁,如果能获取到就把当前线程设置为当前线程,否则返回false。
我们继续回去看acquire,如果没有获取到线程的话会继续走acquireQueued(addWaiter(Node.EXCLUSIVE), arg),先看addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure 快速设置尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;//新节点的pre设置为当前的tail节点
if (compareAndSetTail(pred, node)) {//尝试设置到tail的尾部
pred.next = node;//成功的话把当前tail的next设置尾新节点,完成尾节点的add
return node;
}
}
enq(node);
return node;
}
看相关代码:首先是
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
如果tailOffset位置的Node等于expect,就把update改为tail,这个操作是原子的,如果两个线程进来只有一个会成功,第二个比较的时候tailOffest就不是expect了。然后就是enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
循环把node放到尾部,返回尾部之前的node,也就是predecessor。
网友评论