注:文中代码的解释基本上都以注释的形式和代码写在一起
CountDownLatch是并发环境中常用的计数组件,也是基于AQS实现的。主要的方法有两个,countDown和await,实现了AQS模板方法的tryReleaseShared方法来完成countDown计数减的过程,实现了AQS模板方法的tryAcquireShared方法来实现await阻塞等待功能。
countDown方法
countDown方法源码如下,直接调用了内部类sync的releaseShared方法来实现,这里的Sync和ReentrantLock的内部类Sync一样,是继承了AQS的内部类,releaseShared方法正是AQS提供的共享模式的模板方法。
public void countDown() {
//直接调用了AQS的releaseShared
sync.releaseShared(1);
}
AQS的releaseShared方法源码如下
public final boolean releaseShared(int arg) {
//tryReleaseShared方法AQS留给子类自己实现
//尝试将status减去arg,如果返回为true,执行doRelease方法,否则返回
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared方法中调用了CountDownLatch中实现的tryReleaseShared方法,源码如下
protected boolean tryReleaseShared(int releases) {
//通过源码我们发现传入的参数releases并没有什么用,每次计数固定减一
// 无限循环直到值减一成功或者status变成0
for (;;) {
//获取status的值,CountDownlatch中status的值代表要等待的总计数
int c = getState();
//如果已经是0了说明已经不能再减计数了,返回false
if (c == 0)
return false;
int nextc = c-1;
//CAS的方式将status减一
if (compareAndSetState(c, nextc))
如果当前减完之后,status是0,也就意味着计数结束了,返回true
return nextc == 0;
}
}
tryReleaseShared方法返回为true,也就是计数结束时,会接着执行doReleaseShared方法。doReleaseShared方法在CountDownLatch中没有重写,直接调用的是AQS的doReleaseShared方法,源码如下,其中unparkSuccessor源码解析见另一篇博客《AQS源码解析》
private void doReleaseShared() {
//无限循环
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果头节点设置的是要唤醒下一个节点的等待状态
if (ws == Node.SIGNAL) {
//将节点的waitStatus设置成0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//如果设置成功,唤醒后面的等待节点
unparkSuccessor(h);
}
//ws==0说明第一步设置成功或者原先就是0
//将其状态设置为PROPAGATE
//失败(PROPAGATE状态表示同步状态将会无条件传播,意思就是节点可运行)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果h还是头节点,就结束循环
if (h == head) // loop if head changed
break;
}
}
await方法
await方法直接调用了内部类Sync的acquireSharedInterruptibly方法,阻塞线程直到count为0,当前线程才能拿到锁(或者抛出异常也有可能结束阻塞)。源码如下:
public void await() throws InterruptedException {
//直接调用了内部类Sync的方法(其实是AQS的方法)
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly方法,如果线程被中断过就抛出异常结束阻塞,不然就判断计数的值,为0就返回,等于当前阻塞的线程获得了锁,如果计数不为0,进入doAcquireSharedInterruptibly方法进行排队等待。源码如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断过,抛出异常,线程不再等待
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared尝试获取共享状态
//tryAcquireShared源码见下方,实际就是获取计数
//计数不为0则执行doAcquireSharedInterruptibly方法
//计数为0则返回值大于0,方法直接返回,线程阻塞结束。等于线程获得了锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly方法只有在线程阻塞时会被调用,也就是计数不为0时被调用。方法将当前线程构造为一个共享模式的等待节点加入等待队列中,然后开启无限自循环,直到计数等于0获取到锁,或者抛出异常为止。源码如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//新建一个共享模式节点加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取新建节点的前一个节点
final Node p = node.predecessor();
//如果前一个就是头节点
if (p == head) {
//说明当前线程是第一个等待的,尝试获取锁
//也就是获取计数
int r = tryAcquireShared(arg);
//r不小于0说明计数已经是0了,等于当前线程已经获得了锁
if (r >= 0) {
//将当前线程的节点设置成头节点
setHeadAndPropagate(node, r);
//原先的头节点p从队列中解除,便于垃圾回收
p.next = null; // help GC
failed = false;
return;
}
}
//shouldParkAfterFailedAcquire检查如果获取锁失败当前节点是否需要挂起
//只有当前一个节点的waitStatus是SIGNAL也就是说前一个节点
//获得锁以后会把自己唤醒,当前节点才能放心挂起
//parkAndCheckInterrupt判断节点是否被中断过
//这里意思是如果当前节点是在被挂起状态而且被中断过就抛出异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//如果failed为true说明线程被中断,没有获得锁
//所以取消获取锁的动作
if (failed)
cancelAcquire(node);
}
}
总结来说就是await方法会让当前线程阻塞,进入方法后先判断一次计数是否为0,如果是0则直接返回,线程获得锁,阻塞结束。如果不为0则进入doAcquireSharedInterruptibly方法,将当前线程构造成了一个等待节点,开启无限循环,线程被阻塞。无限循环直到当前线程的节点排队排到了头节点的后面,就可以尝试获得锁了,如果成功了就可以返回,阻塞结束。在排队的过程中如果线程被中断那么就抛出异常。简而言之阻塞是由无限的for循环造成的,所以结束循环就是线程结束阻塞的关键了。
提问:await方法支持多个线程一起等待吗
回答:支持的。从源码角度看,await方法并没有做任何的同步控制,多个线程等待和一个线程等待,结果没有什么不同,所有等待的线程都会阻塞到status为0,阻塞过程中这些线程就乖乖的在队列里等待。
网友评论