CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他10个线程的任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为那10个线程的数量也就是10。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上await()等待的线程就可以恢复执行任务。
下面我们来详细分析
构造函数:
public CountDownLatch(int count) { }; //参数count为计数值,也就是需要等几个线程结束的个数
它有三个主要方法:
public void await() throws InterruptedException { };
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
public void countDown() { };
await():调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
await(long timeout, TimeUnit unit):和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
countDown() :将count值减1,通常在等待的线程完成时调用,当10个线程都执行完,都减1后,count值为0,被挂起的线程就可以启动了。
实例:使用await在主线程阻塞,当每个子线程执行完了,就调用latch.countDown()一次,知道最后count的值为0,才解开主线程的等待;
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread() {
public void run() {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
};
}.start();
new Thread() {
public void run() {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
};
}.start();
try {
System.out.println("等待2个子线程执行完毕...");
latch.await();
System.out.println("2个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出结果:
子线程Thread-0正在执行
子线程Thread-0执行完毕
等待2个子线程执行完毕...
子线程Thread-1正在执行
子线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程
从输出上我们可以知道这个CountDownLatch的使用方法和执行过程了,接下来我们通过对它的主要方法的分析来看一下实现原理。
源码解析:
1.CountDownLatch(int count)
这个是CountDownLatch的构造函数,我们跟进看一下
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
先判断是否count的值是否正常,如果小于0,直接抛出异常,否则创建一个Sync对象
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
我们看到这里使用了AQS的Sync,唯一与lock不同的是把state设置为了count,然后获取锁的逻辑tryAcquireShared方法也做了对应的调整,这里获取锁的话判断state是否为0。
2.await()
await方法用于阻塞线程,也就是令当前线程阻塞直到拿到锁(state==0也就是count==0)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
先判断是否中断,如果中断的话响应中断并抛出异常,结束阻塞,然后通过tryAcquireShared获取锁,我们来看tryAcquireShared方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
拿到锁的唯一条件就是state==0,也就是子线程通过countDown()方法把count变为0才可以。我们接下来先看doAcquireSharedInterruptibly()上锁的过程。
3.doAcquireSharedInterruptibly()
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里的逻辑和lock方法的逻辑基本一致,只是稍作修改,主线程通过parkAndCheckInterrupt方法进行了阻塞。
4.countDown()
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
先调用了tryReleaseShared来解锁,我们看一下这个过程
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
拿到state,然后通过CAS把state-1,最后返回Boolean值,如果state==0,说明锁释放返回true,如果state>0,返回false,这里也就证明了我们的猜想,countdown方法就是来把state每次减一的,直到所有子线程执行完,减为0,锁释放。我们继续看锁释放后的执行过程。
5.doReleaseShared()
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这里的步骤也和lock的释放过程类似,最后通过waitStatus的判断来执行unparkSuccessor()唤醒阻塞的线程。
6.setHeadAndPropagate()
当唤醒await的线程后,会执行第3步doAcquireSharedInterruptibly()里的setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
可以看到,把唤醒的node设置为了head节点,也就是node拿到锁可以继续执行了,那么如果有其它的await也在等待呢?此时count为0,其它的肯定也要向下执行,是怎么连续唤醒的呢,我们看本方法里的Node s = node.next;
这里判断后续阻塞节点,如果存在,就执行 doReleaseShared();持续唤醒,doReleaseShared()在也就是第5步,他会解除head节点的next的阻塞,然后再执行本步骤,设置为head,循环唤醒。
最后:
这里循环唤醒也是共享锁的实现方式。在这里我们也再次印证了AQS是java.util.concurrent包下几乎所有类的实现核心,像CountDownLatch、CyclicBarrier和Semaphore三大辅助类,lock等都是基于AQS来实现自己的控制逻辑的。
网友评论