//每次reader+1
private static final long RUNIT = 1L;
//倒数第八位,写锁标志位
private static final long WBIT = 1L << LG_READERS;
//倒数后七位,读线程数量
private static final long RBITS = WBIT - 1L;
//最大读线程数量
private static final long RFULL = RBITS - 1L;
//用它检测是否在写以及当前读线程数量
private static final long ABITS = RBITS | WBIT;
private static final long SBITS = ~RBITS; // note overlap with ABITS
static final class WNode {
volatile WNode prev; //前置节点
volatile WNode next; //后置节点
volatile WNode cowait; // list of linked readers
volatile Thread thread; // 当前线程
volatile int status; // 状态位!全靠它实现所有无锁/CAS和锁操作
final int mode; // 读或写
WNode(int m, WNode p) { mode = m; prev = p; }
}
public long readLock() {
long s = state, next;
//1. CLH队列空
//2. 没有写,读线程没满
//3. 尝试用CAS获得读资格
//上述条件都满足时直接返回修改后的状态位(读线程+1),否则继续尝试获得读锁
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
private long acquireRead(boolean interruptible, long deadline) {
//当前节点,p尾指针
WNode node = null, p;
//SPINS时最大尝试次数
for (int spins = -1;;) {
//h头指针
WNode h;
//CLH空,不断的使用CAS去尝试
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
//没有写,而且读线程未满就用CAS尝试
//否则尝试增加读线程溢出
//都成功就返回
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
//有写线程,再来一次循环
else if (m >= WBIT) {
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
//尝试失败
if (spins == 0) {
WNode nh = whead, np = wtail;
//h,p指针没错,并且CLH队列不空,跳出整个自旋
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
//设置最大尝试次数,再来一次
spins = SPINS;
}
}
}
}
// initialize queue,后面的操作都在插入CLH队列
// new一个空WNode,让头指针指向它,以后只要头尾指针相同,则CLH队列为空
if (p == null) {
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
//让尾节点到时通知当前节点(他在尾节点后面)
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
node.cowait = null;
else {
//到这说明CLH不空而且node会被尾节点通知,即CLH队列状态就绪
for (;;) {
WNode pp, c; Thread w;
// 唤醒CLH队列第一个节点
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
// CLH队列只有一个节点/空/p是无意义的头节点
// 只要没有写线程一直自旋CAS
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
// CLH无变化,即没有冲突
//之前的尝试都失败,准备阻塞线程
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
//deadline=0L,本方法内也不会被改变
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
//interruptible=false,不会改变
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
//到这说明:尝试次数耗尽并且此过程中无冲突/已将node加入空队列尾
for (int spins = -1;;) {
//h头指针/p尾指针
WNode h, np, pp; int ps;
//CLH队列空
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long m, s, ns;
//和之前相似,尝试CAS
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node;
node.prev = null;
//自己得到读锁,就唤醒他人
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
//队列不空,就从头唤醒队列中元素
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
//node成为队尾
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
//让p状态为waiting
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
//p已经取消,从队列中删除
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
//到这说明node已经入队并且整个队列之前元素的状态已经设置完成
//和之前相似,准备阻塞线程
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
网友评论