上篇讲了ReentrantLock和AQS的独占模式,这篇讲一下CountDownLatch,他是利用AQS的共享模式实现的。
基本用法
CountDownLatch主要有2个方法: await()和countDown(),先创建一个CountDownLatch并指定count
CountDownLatch cdl = new CountDownLatch(1);
假如现在要让几个线程同时执行里面的某一段代码,可以让几个线程先执行CountDownLatch的await方法,然后在外面执行countDown方法。
public static void main(String[] args) {
CountDownLatch cdl = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread " + Thread.currentThread().getName() + " start");
}, i + "").start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cdl.countDown();
}
运行结果
thread 0 start
thread 2 start
thread 4 start
thread 1 start
thread 3 start
源码原理
类似ReentranLock, CountDownLatch 内部也有一个Sync类。
我们先看看await方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果已中断,就抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
他先尝试获取,如果没有拿到,就执行节点入队和park操作。
我们先来看看tryAcquireShared方法在CountDownLatch里的实现:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果state为0返回1,否则返回-1,就是说state减到0的时候才可以拿到锁。
再来看doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 入队一个shared节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 自旋
for (;;) {
final Node p = node.predecessor();
// 如果是head的next节点,则去尝试去拿锁
if (p == head) {
int r = tryAcquireShared(arg);
// 如果拿到锁就唤醒head的后继节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 将前置节点置为signal,返回true的话接着执行线程park
// 如果线程已被中断,则抛出中断异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 如果发生线程中断了,取消获取锁,这也说明了AQS是可以响应中断的
if (failed)
cancelAcquire(node);
}
}
这里的逻辑和上篇讲到的AQS独占锁的入队+竞争队列的逻辑基本类似,都是先入队再去自旋竞争锁,拿不到就挂起。下面再看看setHeadAndPropagate方法是怎么唤醒后继节点的:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置当前节点为头结点
setHead(node);
// propagate == 1一般情况成立
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// s.isShared()一般成立
if (s == null || s.isShared())
doReleaseShared();
}
}
再看看doReleaseShared方法:
private void doReleaseShared() {
// 自旋
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒head的next节点
// next节点被唤醒之后,他在doAcquireSharedInterruptibly方法里
// 继续执行setHeadAndPropagate方法唤醒他的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
以上就是CountDownLatch的await的大致逻辑了。他和独占模式最主要的区别就是:他拿到锁之后会唤醒后继节点,后继节点被唤醒了,继续去抢锁,抢到锁再唤醒他的后继节点,直到唤醒同步队列里所有的节点;而独占模式里节点抢到锁就直接返回了,释放锁是通过unlock()方法,后继节点被唤醒,然后抢到锁也就返回了,并不会唤醒后继节点。
下面看看countDown方法:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// 如果已经是0,返回false
if (c == 0)
return false;
int nextc = c-1;
// 如果减完之后为0且cas成功则返回true
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
releaseShared里首先将state-1,如果state-1之后等于0,那就释放锁,返回true,否则返回false。tryReleaseShared方法里就是通过自旋+CAS的方式设置state,设置成功且state为0就返回true。
以上就是目前我对CountDownLatch的理解了。
网友评论