一、通过ReentrantLock来分析AbstractQueuedSynchronizer源码
//初始化一个公平锁
ReentrantLock lock = new ReentrantLock(true);
加锁lock
//java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
acquire(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 线程尝试获取锁 tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
列出一些AQS中的对象属性域
//If head exists, its waitStatus is guaranteed not to be CANCELLED
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
private Thread exclusiveOwnerThread;
若state == 0;
若等待队列中没有线程排队,compareAndSetState(0, acquires)成功;
setExclusiveOwnerThread(current),返回true。表示加锁成功;
若current == getExclusiveOwnerThread();
state += acquires,更新state,返回true。表示锁重入成功
判断等待队列中是否有线程排队。若 head == tail 说明等待队列没有线线程入队;若 head != tail,head.next != null,当前的线程与head.next中的线程不同,说明等待队列中有线程在等待;若 head != tail,head.next == null ,说明等待队列中有线程在等待。(这种情况怎么产生的?)
- tryAcquired失败,创建node并加入等待队列 addWaiter
//return the new node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//return the node's predecessor
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
列出一些Node中的对象属性域
//The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes.
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
最初等待队列未初始化,head和tail都为null,通过CAS竞争某个线程会将head和tail初始化为一个空Node;
不断尝试让当前Node成为tail,并返回此Node;
queue.png
- 线程再次尝试获取锁或者Park
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
若当前节点是等待队列中的第一个节点,若尝试获取锁成功,设置head为当前Node,返回中断状态;
判断是否需要Park,若需要则Park当前线程;
判断当前线程是否需要Park,总是检查前一个Node的waitStatus是否为-1,一般Node初始化时设置为0。所以Park当前线程总会经历这样一个流程,将前一个Node的waitStatus设置为-1,然后Park当前线程。
解锁release
//java.util.concurrent.locks.ReentrantLock.FairSync
public void unlock() {
sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 解锁 tryRelease
//java.util.concurrent.locks.ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
若当前线程没有持有锁则抛出异常;
state -= releases,若 state == 0,设置setExclusiveOwnerThread(null),设置解锁状态 free = true;
更新state;
返回解锁状态 free;
- tryRelease成功,UnPark等待队列中的第一个线程
显示锁的API | 特性 |
---|---|
public boolean tryLock() | 可轮询,非阻塞。非公平 |
public final boolean tryAcquireNanos(int arg, long nanosTimeout) | 可配置超时,可定时中断。 |
public void lockInterruptibly() throws InterruptedException | 可中断 |
条件队列Condition
//java.util.concurrent.locks.ReentrantLock.Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
列出一些ConditionObject中的对象属性域
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
- 等待 await
public final void await() throws InterruptedException {
//首先检查中断
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程包装成Node加入“条件队列”
Node node = addConditionWaiter();
//解锁(包括重入锁),保留state(下次获取锁时会用到)
int savedState = fullyRelease(node);
int interruptMode = 0;
//Node没有转移到“阻塞队列”,则Park。signal唤醒会使得Node转移到阻塞队列(相当于拥有了竞争锁的权力)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//尝试获取锁或者被Park
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
p1.png
- 唤醒 signal
public final void signal() {
//检查当前线程是否持有锁(上面await方法在fullyRelease时执行了隐式检查)
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//唤醒“条件队列”中第一个Node
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//设置firstWaiter 为下一个节点,若下一个节点为null则表明“条件队列”中已经没有Node了,设置lastWaiter为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//first节点将被转移到“阻塞队列”,没有必要保留与下一个Node的关系了
first.nextWaiter = null;
} while (!transferForSignal(first) &&//转移Node失败则尝试转移下一个Node
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将Node加入“阻塞队列”
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//unpark这个Node
LockSupport.unpark(node.thread);
return true;
}
二、常用工具类CountDownLatch、CyclicBarrier 、Semaphore、ReadWriteLock
CountDownLatch
//java.util.concurrent.CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//java.util.concurrent.CountDownLatch.Sync
Sync(int count) {
setState(count);
}
初始化CountDownLatch,实质上就是设置了state值。
- 等待 await
//java.util.concurrent.CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//响应中断
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//java.util.concurrent.CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
state没有countdown到0,则会执行下面的方法。
唤醒队列中所有线程(state为0时)或Park当前线程
//java.util.concurrent.locks.AbstractQueuedSynchronizer
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//包装当前线程为Node并加到“阻塞队列”
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//若为“阻塞队列”中的头节点,检查state是否为0,若为0
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//会唤醒下一个Node
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//Park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//将当前Node设置为“阻塞队列”的头节点,执行doReleaseShared(唤醒“阻塞队列”中排头Node)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//ShareMode模式下的释放操作(唤醒后继节点,保证传播。对于ExclusiveMode,释放通过多次调用unparkSuccessor)
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//如果当前Node的下一个节点存在,Unpark它。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 递减state countDown
//java.util.concurrent.CountDownLatch
public void countDown() {
sync.releaseShared(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
//state为0时,执行doReleaseShared(唤醒“阻塞队列”中其他Node)
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//java.util.concurrent.CountDownLatch.Sync
//递减state
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
CyclicBarrier
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
初始化CyclicBarrier
- 等待 await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//所有线程到达“栅栏”后,由最后一个到达的线程执行barrierCommand。并且唤醒其他await的线程。
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
//线程await,这里会释放锁
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
列出一些CyclicBarrier中的对象属性域
//The number of parties
private final int parties;
//Number of parties still waiting
private int count;
private final ReentrantLock lock = new ReentrantLock();
//Condition to wait on until tripped
private final Condition trip = lock.newCondition();
//The command to run when tripped
private final Runnable barrierCommand;
private Generation generation = new Generation();
//用于重置“栅栏”
private static class Generation {
boolean broken = false;
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
执行await时,count等于0时,表示所有线程已到达trip。唤醒所有await的线程,重置“栅栏”。
Semaphore
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//java.util.concurrent.Semaphore.Sync
Sync(int permits) {
setState(permits);
}
初始化Semaphore,设置state值。默认使用非公平模式。
- 获取资源 acquire
//java.util.concurrent.Semaphore
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//支持中断的,共享获取模式
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当tryAcquireShared返回负值,那么doAcquireSharedInterruptibly可能会Park当前线程
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//java.util.concurrent.Semaphore.FairSync
protected int tryAcquireShared(int acquires) {
//当remain不小于0时,需要自旋重试
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
//可用资源available 为0,remaining<0;可用资源available>=请求的资源acquires,则remaining >=0;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 释放资源 release
//java.util.concurrent.Semaphore
public void release() {
sync.releaseShared(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//这里会unpark“阻塞队列”中的排头的Node(非Head)
doReleaseShared();
return true;
}
return false;
}
//java.util.concurrent.Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
ReentrantReadWriteLock
//java.util.concurrent.locks.ReentrantReadWriteLock
//默认初始化一个非公平的Sync
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReadLock、WriteLock是ReentrantReadWriteLock的 静态内部类
,可以发现它们用的 sync
是同一个。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//ReadLock使用state的高16位
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//WriteLock使用state的低16位
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
ReadLock、WriteLock共用state。ReadLock使用state的高16位,WriteLock使用state的低16位。(这种技巧在ExecutorThreadPool也用到了)
- 读锁的加锁
//java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
public void lock() {
sync.acquireShared(1);
}
//
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
可以明显的看到,ReadLock使用的是共享式的加锁方式。
//java.util.concurrent.locks.ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//exclusiveCount(c)取state的低16位,不为0,则说明存在写锁。这时若当前线程不是持有写锁的线程,则获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取state的高16位(这是留给读锁用的)
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//尝试CAS修改state(这里没有区分当前读锁是否是重入)
compareAndSetState(c, c + SHARED_UNIT)) {
//下面都是一些统计信息,暂时忽略
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//Full version of acquire for reads, that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.
return fullTryAcquireShared(current);
}
- 写锁的加锁
//java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
public void lock() {
sync.acquire(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以明显的看到,WriteLock使用的是独占式的加锁方式。
//java.util.concurrent.locks.ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//c !=0 说明此时有读锁或写锁存在
if (c != 0) {
//c !=0,w==0 代表此时有读锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//此时有写锁且为当前线程持有
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
ReentrantReadWriteLock的核心规律就是,写锁和写锁互斥,写锁和读锁互斥,读锁和读锁不互斥。其目的是在读并发比较高的情况下,会有更好的效率。
三、总结
常用工具类 | |
---|---|
CyclicBarrier | 基于ReentrantLock实现,主要应用了Condition |
ReentrantLock | 基于AQS独占模式实现 |
CountDownLatch | 基于AQS共享模式实现 |
Semaphore | 基于AQS共享模式实现 |
ReentrantReadWriteLock | 基于AQS独占模式(写锁)和共享模式(读锁)的混合型 |
网友评论