1 初次相识
在ReentrantLock锁代码中,你发现,锁的动作是由Sync的实例来处理,而Sync是继承了AbstractQueuedSynchronizer;而在ReentrantReadWriteLock锁中也会由这样的发现;
AbstractQueuedSynchronizer是锁实现的基石,实现了资源相关线程出队入队、唤醒、暂停等逻辑;通过这些逻辑,你可以实现可重入锁,共享锁,独占锁,条件锁,公平锁,非公平锁(条件锁,不是一种锁,是一种使用方法)。
2、原理
锁其实是对一些资源的保护,拥有这些资源,可以继续执行,不持有资源执行权限则等待资源被释放后,再次竞争资源的执行权限;
把线程可以继续执行,看成是持有资源执行权限,若被挂起,则表明未持有资源,且资源被其它线程持有,若果被唤醒,则需要继续竞争资源的执行权限
2.1 Unsafe类
2.2 静态内部Node类
主要内容有:
- 实现了next、prev、thread、waitStatus四个内部变量的CAS操作
- 双向链表结构的节点
- waitStatus变量标志线程thread的状态
static final class Node {
/** 标记共享模式 */
static final Node SHARED = new Node();
/** 标记独占模式 */
static final Node EXCLUSIVE = null;
/** 标记节点被取消 */
static final int CANCELLED = 1;
/** 标记后继节点需要被唤醒 */
static final int SIGNAL = -1;
/** 标记节点位于条件阻塞队列中 */
static final int CONDITION = -2;
/**
* 标记共享模式下,节点共享状态正在被传播(acquireShared)
* 当前节点获得锁或释放锁时, 共享模式下节点的最终状态是 PROPAGATE
*/
static final int PROPAGATE = -3;
/**
* !!!重中之重!!!
* 标记节点状态,默认为0,负数无须唤醒,使用CAS原子更新
* 独占模式:SIGNAL、CANCEL、0
* 共享模式:SIGNAL、CANCEL、PROPAGATE、0
* 条件变量:CONDITION状态不会存在于CLH锁同步队列中,只用于条件阻塞队列
*/
volatile int waitStatus;
/**
* 在CLH锁同步队列中链接前驱节点,使用CAS原子更新,每次入队和GC出队时会被指派
* 当前驱节点被取消时,一定能找到一个未被取消的节点,因为Head节点永远不会被取消:头节点必须成功aquire
* 被取消的线程不会再次成功aquire,线程只能取消自己不会影响其他
* 主要作用是在循环中跳过CANCELLED状态的节点
*/
volatile Node prev;
/**
* 在CLH锁同步队列中链接后继节点,每次入队、前驱节点被取消以及GC出队时被指派
* 赋值操作非线程安全,next为null时并不意味着节点不存在后继节点
* 当next不为null时,next是可靠的
* 主要作用是在释放锁时对后继节点进行唤醒
*/
volatile Node next;
/** Node关联线程 */
volatile Thread thread;
/**
* 链接位于条件阻塞队列的节点或特定SHARED值
* 实际作用就是标记Node是共享模式还是独占模式
* 独占模式时为null,共享模式时为SHARED
* 在条件阻塞队列中指向下一个节点
*/
Node nextWaiter;
/**
* 判断Node是否为共享模式
* @Return true 是 false 不是
*/
final boolean isShared() {
//当是共享模式时,nextWaiter就是SHARED值,独占模式就是null
return nextWaiter == SHARED;
}
/**
* 返回前驱节点,当前驱节点为空时直接抛空指针异常(实际上Head永远不会为null)
*/
final Node predecessor() throws NullPointerException {
//空指针判断只要是为了help gc
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//默认共享模式
Node() {}
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
U.putObject(this, THREAD, Thread.currentThread());
}
Node(int waitStatus) {
U.putInt(this, WAITSTATUS, waitStatus);
U.putObject(this, THREAD, Thread.currentThread());
}
final boolean compareAndSetWaitStatus(int expect, int update) {
return U.compareAndSwapInt(this, WAITSTATUS, expect, update);
}
final boolean compareAndSetNext(Node expect, Node update) {
return U.compareAndSwapObject(this, NEXT, expect, update);
}
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long NEXT;
static final long PREV;
private static final long THREAD;
private static final long WAITSTATUS;
static {
try {
NEXT = U.objectFieldOffset
(Node.class.getDeclaredField("next"));
PREV = U.objectFieldOffset
(Node.class.getDeclaredField("prev"));
THREAD = U.objectFieldOffset
(Node.class.getDeclaredField("thread"));
WAITSTATUS = U.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
2.3 条件内部类ConditionObject
条件锁的队头队尾,也是先进先出队列,并且其利用nextWaiter指向下一个条件节点,也就是单向链表
private transient Node firstWaiter;
private transient Node lastWaiter;
2.3.1 条件锁等待流程
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- addConditionWaiter方法,进行加入条件队列
- fullyRelease方法:由于当前线程持有资源,才可以使用条件等待,所以需要先释放资源,让其它线程唤醒执行,才可以使用资源
- isOnSyncQueue方法,查看是否在锁的等待队列(外部类的队列,包含独占锁、共享锁的对象的那个),不在会被挂起;
- checkInterruptWhileWaiting方法,如果处于打断状态,则有可能把节点加入锁等待队列
- acquireQueued:尝试获取资源,若未获取,则挂起线程,否则,继续执行
unlinkCancelledWaiters方法,条件队列重新检查,去除无效节点,被取消的线程节点 - reportInterruptAfterWait 方法,设置线程打断
2.3.2 条件锁唤醒流程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
- isHeldExclusively方法直接抛出异常,需要自定义实现
- doSignalAll方法释放每一个节点
doSignalAll方法
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
其具体释放动作由transferForSignal实现
transferForSignal方法
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 假设当前状态为条件状态,重置为运行状态,如果失败,则false,也即是进行唤醒下一个,尝试去获取资源:也就是这是状态已经被改变了
- 如果状态修改为运行状态,节点加入到锁等待队列;再次判断状态,如果已经取消,或者重置状态为等待唤醒失败,则唤醒线程;说明线程要么取消,要么状态已经改变,CAS置换为等待唤醒状态失败,则当前线程不能被唤醒,所以需要在这里释放
小结
- 条件锁,有专门的队列,存放条件等待线程节点;单向列表,使用nextWaiter指向下一个
- isHeldExclusively方法需要实现,表示是否为当前线程持有锁
- 条件锁释放后,条件等待队列中节点如果不是条件状态,说明已经被改变,也就是被唤醒
- 如果仍是condition状态,则加入锁等待队列,并重新检查状态,如果取消或者CAS重置状态失败,则唤醒线程
2.4 CAS操作内部变量
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long HEAD;
private static final long TAIL;
static {
try {
STATE = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
HEAD = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
Class<?> ensureLoaded = LockSupport.class;
}
原子操作的对象:状态state,队列头head,队列尾tail 内部变量state实现了原子非原子操作
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
内部变量head,也是实现了非原子操作
private transient volatile Node head;
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
内部变量tail,也是实现了非原子操作
private transient volatile Node tail;
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
private final boolean compareAndSetTail(Node expect, Node update) {
return U.compareAndSwapObject(this, TAIL, expect, update);
}
同样在内不对于head,tail变量还有直接使用的,也是不满足原子操作的,那么为何最后又能达到同步的效果呢,只能看具体代码了
网友评论