CountDownLatch是并发编程中使用比较多的一个并发工具类,通过它我们可以实现一些复杂的业务,比如某一个任务被拆分成多个子任务,而最终等所有子任务完成之后再经过一个单独的线程将所有子任务的结果进行汇总,这就是一个线程等待所有线程执行结果的一个简单模型
先来模拟一下这种场景
import java.util.concurrent.CountDownLatch;
public class TestCountdownLatch {
public static void main(String[] args) {
//初始化一个CountDownLatch,初始值为8,表示有八个线程需要被统计
//执行结果
CountDownLatch count = new CountDownLatch(8);
new Thread(new Runnable() {
@Override
public void run() {
try {
//CountDownLatch开始等待其他线程结果,注意这个await()方法调
//时机不限,可以在count.countDown()之前也可以在之后
count.await();
System.out.println("所有线程执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//开启8个线程
for (int i = 0; i < 8; i++) {
int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(finalI *1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName()+"执行完成");
//在每个线程中任务执行完成的时候进行countDown,这个方法要
//确保一定执行,所以放在finally中,哪怕发生异常也要执行
count.countDown();
}
}
}).start();
}
}
}
打印结果:
Thread-1执行完成
Thread-2执行完成
Thread-3执行完成
Thread-4执行完成
Thread-5执行完成
Thread-6执行完成
Thread-7执行完成
Thread-8执行完成
所有线程执行完成
源码
await()
public void await() throws InterruptedException {
//sync是一个内部类,在CountdownLatch创建对象的时候就new出来了
//并缺传入了count值保存起来
sync.acquireSharedInterruptibly(1);
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//Sync是继承AbstractQueuedSynchronizer实现的,和ReentrantLock相同
this.sync = new Sync(count);
}
进入acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果调用了Thread.currentThread().interrupt(),就抛出异常
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
接着先看tryAcquireShared方法,它的实现就是在Sync类中的tryAcquireShared方法,可以看到这里只是判断了state值,这个state值就是我们在new CountdownLatch的时候传入的count 8,很明显,第一次执行这个方法因为state=8,所以返回了-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
tryAcquireShared(arg)反回了-1满足<0的条件,所以执行doAcquireSharedInterruptibly方法,这里的实现和ReentrantLock中的实现很像
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加等待的node,加入一个双向链表中,返回本次加入的node,看到这里
//需要先去看看addWaiter方法的实现,然后再继续往下看
final Node node = addWaiter(Node.SHARED);
//node添加之后继续往下看
boolean failed = true;
try {
//无限循环,满足条件后跳出
for (;;) {
//node.predecessor()返回的是这个node的pre节点,如果当前队列只
//有一个node,那么pre就是第一次创建的空node
final Node p = node.predecessor();
//p == head什么时候会满足呢?当链表除了空node之外只有一个node
//的时候,这个node的pre就是head,所以p==head满足
if (p == head) {
//链表中只有一个node,判断state值是否0,是0 tryAcquireShared
//返回的就是1,否则-1,countDown方法执行一次,state就会-1,
//知道被减为0的时候if (r >= 0) {下边才会被执行,我们先假设
//tryAcquireShared返回值小于0,也就是countDown方法没有调完
int r = tryAcquireShared(arg);
if (r >= 0) {
//当countDown方法被调用完成,那么tryAcquireShared就会返回
//1,执行到这里来
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//countDown方法没有调完就会走到这里来
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
addWaiter
private Node addWaiter(Node mode) {
//创建一个Node节点保存当前线程和本节点对象,node就是个空对象
//主要目的就是记录线程信息
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//tail表示链表的尾节点,第一次添加,尾节点和头节点head都是null
Node pred = tail;
if (pred != null) {
//第二次addWaiter的时候会走到这里,将本次添加的node的pre设置为
//当前尾节点
node.prev = pred;
//然后将本次添加的node设置为新的尾节点
if (compareAndSetTail(pred, node)) {
//将上一个node的next设置为本次添加的node,形成双向链表
pred.next = node;
return node;
}
}
//入队,看完入队再看上边 if (pred != null) { 的情况
enq(node);
return node;
}
enq(),入队的操作,这是一个无限循环,只有将当前节点设置完成之后才会退出,
private Node enq(final Node node) {
for (;;) {//for循环入队,成功之后返回
//再次判断tail是否为 null,单线程下这里肯定不为null,因为上边判断过
//多线程下有可能,其他线程可能先一步添加了节点,所以再次判断
Node t = tail;
//为null,就初始化一个node,(注意,这个node是一个空的node,不包
//含线程信息,和传入的node不是同一个),将head节点指向它,所以
//它就成了头节点,相当于第一次new 了一个head和tail
if (t == null) { // Must initialize
//CAS方式实现,先对比判断当前头节点是不是null,是就将new出来
//的node设置为头节点
if (compareAndSetHead(new Node()))
//头节点设置成功,让尾节点同样也指向这个节点,这时候还没有
//完成,因为没有被return,那么会再次进入循环拿到尾节点,也就
//是本次new出来的这个空的node,然后因为它不是null,所以进入了
//下下一个else
tail = head;
else
//源码中没有这个else,加上它是为了方便描述,也就是说如果判断
//t == null之后,通过cas的方式设置头节点却失败了,可能就是其
//他线程先一步设置了头节点,那么此时什么都不做,继续下一次
//循环
} else {
//如果尾节点不为空了,那么就将当前创建的节点的前节点指向尾节点
//如果这是第一次添加node,那么这个t就是上边创建的空node
node.prev = t;
//cas的方式将这个节点设置为新的尾节点
if (compareAndSetTail(t, node)) {
//然后将上一个节点的next节点设置为加的这个节点,形成双向链表
t.next = node;
return t;
}else{
//(源码里也没有这个else),如果此时cas设置新的尾节点失败,
//可能是被其他线程抢先了,那么就再次进入下一次循环,直到设置
//尾节点成功后返回
}
}
}
}
addWaiter一次之后链表的状态如下图
屏幕快照 2020-04-06 下午2.29.04.png
shouldParkAfterFailedAcquire
两个参数,第一个是当前节点的前一个节点,第二个是当前节点,这里判断了
waitStatus这个值,它有几种取值,
SIGNAL = -1;
waitStatus value to indicate successor's thread needs unparking,表示等待被唤醒
CANCELLED = 1;
waitStatus value to indicate thread has cancelled,线程被取消
CONDITION = -2;
waitStatus value to indicate thread is waiting on condition,线程正在等待条件
PROPAGATE = -3;
waitStatus value to indicate the next acquireShared should unconditionally propagate 线程的共享锁应该被无条件传播
0;
0的时候表示当前节点为头节点,默认是0
默认都是0,因为并没有在其他地方赋值,所以第一次进入,会走到else分支
shouldParkAfterFailedAcquire是无限for循环实现的,每个线程(没有被取消的)进入这个方法后会被循环两次,第一次waitStatus = 0,然后修改为SIGNAL,返回true,执行后边的线程挂起parkAndCheckInterrupt操作
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//判断前一个节点的waitStatus,文章开头的例子里,只在一个线程中调用
//了await方法,那么等待链表中只有两个node一个是空的头,一个是本线程
//节点作为tail存在,那么pred指的就是那个空节点,因为默认waitStatus=0
//然后第二次循环把waitStatus设置为了-1,也就是SIGNAL,后边countDown方法中有用到
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
//返回true,执行parkAndCheckInterrupt()方法
//状态为SIGNAL表明线程进入等待被唤醒的状态,此时线程park
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 表示线程已经被取消,这个时候要把被取消的线程节点从链表中删除
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//判断=0,然后将waitStatus设置为single,return false,然后开始下一次循环,
//进入下一次循环后再次进入这个方法,判断到waitStatus=SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
很简单,就是将线程挂起了,那么此时线程就是在等待唤醒,下边的return语句在被唤醒之前不会执行
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
到这里线程就被挂起了,接下来我们看看什么条件下线程会被唤醒
countDown()
public void countDown() {
//仍然是sync中的实现
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//获取到当前state值
int c = getState();
//如果已经等于0了,那么不再有任何操作,返回false
if (c == 0)
return false;
//将state值-1
int nextc = c-1;
//cas的方式更新state,因为是在for循环中执行的,所以哪怕有更新失败
//也会继续进行下一次更新,直到成功
if (compareAndSetState(c, nextc))
return nextc == 0;//如果-1之后,state为0,返回true
}
}
直到state被减为0后,tryReleaseShared返回true,才会执行后边的doReleaseShared()方法
private void doReleaseShared() {
for (;;) {
//拿到头节点,以我们本篇文章开头的例子分析,await方法只在一个线程
//中被调用,那么waiter链表中只有两个节点,头节点是空节点,尾节点
//就是那一个线程的节点
//那么这里的head就是那个空节点
Node h = head;
if (h != null && h != tail) {
//所以h != null && h != tail条件是满足的,头节点的waitStatus=
//-1(SIGNAL)(上边分析过的)
int ws = h.waitStatus;
//条件满足
if (ws == Node.SIGNAL) {
//将头节点waitStatus改为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//开始执行线程唤醒操作
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//为什么这里要做这个判断,看setHeadAndPropagate方法中的setHead
//方法,就是因为其他线程可能会改变head节点,如果被改变了,那么就
//要重新循环一次
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor
private void unparkSuccessor(Node node) {
//这个node是传入的head节点,waitStatus在上一步中已经被设置为0
int ws = node.waitStatus;
//如果不是0,再次设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//拿到头节点后边的节点,也就是我们添加的那一个
Node s = node.next;
//如果这个节点为null,表示可能是节点已经被移除,waitStatus如果大于0
//表示可能是节点以经被取消,但是还没被移除
if (s == null || s.waitStatus > 0) {
//再次设置为null
s = null;
//因为这个节点是无效节点,所以从后往前找,找到一个嘴靠近头节点
//的节点,将它设置给s,然后进行唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//一般情况都是s不为null,说明这个节点正在等待被唤醒,那么就唤醒它
//这样以来,这个上次被阻塞在LockSupport.park(this)的线程就重新开始运行
if (s != null)
LockSupport.unpark(s.thread);
}
那么线程被唤醒之后做了什么,要想知道这个我们需要再次回到线程被阻塞的代码中
setHeadAndPropagate()
重新设置head头节点,因为第一个被唤醒的node已经执行完了,那么上一个head被移除,这个node就变成了新的head,并且这个新的head同样被置为空node
private void setHeadAndPropagate(Node node, int propagate) {
//保存当前的head节点
Node h = head; // Record old head for check below
//将这个节点设置为head,那么上边那个head就被移除了,看看setHead做了什么
setHead(node);
//上边知道传入的propagate = 1
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//第一个线程被唤醒后再次进入了唤醒线程的方法doReleaseShared
//开始唤醒下一个线程
doReleaseShared();
}
}
private void setHead(Node node) {
//设置为head节点
head = node;
//置空线程和prev.那么它就成了一个空节点了
node.thread = null;
node.prev = null;
}
总结
总结一下唤醒机制,首先唤醒第一个挂起的线程, 然后这个线程被唤醒后再去唤醒下一个线程,循环反复直到所有线程都会被唤醒,这就是AQS共享锁的唤醒原理
网友评论