一 CountDownLatch
- 继承AbstractQueuedSynchronizer实现子类Sync
- CountDownLatch实例化,参数指定并发锁数。
new Sync(count)
实例化初始化时即设置占用锁数量
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//
Sync(int count) {
setState(count);
}
- await()等待锁,tryAcquireShared()不可重入获取锁。只能等待初始化占用的锁全部释放
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
- countDown()释放锁, 实际调用tryReleaseShared()释放一个锁。
public void countDown() {
sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
- 调用初始化时count次countDown()后,await()线程才能获取到锁被唤醒。
- 不可重复使用
二 CyclicBarrier
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;//总数量
this.count = parties;//待释放的barrier数量
this.barrierCommand = barrierAction;
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
final Generation g = generation;
if (g.broken)//本次barrier处理,已被中断
throw new BrokenBarrierException();
if (Thread.interrupted()) {//线程中断
breakBarrier();//中断本次barrier处理
throw new InterruptedException();
}
int index = --count;//释放一个barrier
if (index == 0) { // 为0表示已全部满足
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//barrier完成处理回调,每个屏障点都调用
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)//barrierCommand执行异常则中断barrier
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {//还有屏障点未到达,则休眠等待条件对象通知
try {
if (!timed)//非限时类型
trip.await();//休眠等待
else if (nanos > 0L)//指定时间休眠
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)//barrier流程已变更
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void breakBarrier() {
generation.broken = true;//标记本轮barrier处理中断
count = parties;//更新barrier数量
trip.signalAll();//通知所有等待线程恢复处理
}
- nextGeneration,重置barrier,下一轮处理。 可重复使用。
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();//通知等待线程
// set up next generation
count = parties;
generation = new Generation();//初始化中断状态
}
网友评论