并发源码分析篇:
先来看下CountDownLatch的用法,模拟多线程执行账单计算
public class CountDownLatchDemo {
static CountDownLatch latch = new CountDownLatch(4);
static int num = 0;
public static void main(String[] args) {
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
latch.countDown();
},"work_1").start();
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
latch.countDown();
},"work_2").start();
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
latch.countDown();
},"work_3").start();
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
latch.countDown();
},"work_4").start();
try {
latch.await();
System.out.println("总结果:"+num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
work_1:我完成+1操作了
work_2:我完成+1操作了
work_3:我完成+1操作了
work_4:我完成+1操作了
总结果:4
在来看下没有用CountDownLatch的效果
public class CountDownLatchDemo {
static CountDownLatch latch = new CountDownLatch(4);
static int num = 0;
public static void main(String[] args) {
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
//latch.countDown();
},"work_1").start();
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
//latch.countDown();
},"work_2").start();
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
//latch.countDown();
},"work_3").start();
new Thread(() ->{
num ++;
System.out.println(Thread.currentThread().getName()+":我完成+1操作了");
//latch.countDown();
},"work_4").start();
try {
//latch.await();
System.out.println("总结果:"+num);
} catch (Exception e) {
e.printStackTrace();
}
}
}
work_1:我完成+1操作了
总结果:1
work_2:我完成+1操作了
work_3:我完成+1操作了
work_4:我完成+1操作了
CountDownLatch的作用显而易见,它能阻塞线程直到达到某个标准后才会被释放继续执行。这个标准就是取决于构造传入的那个值。
现在,我们看下CountDownLatch的底层到底是怎样实现的。
首先就从这个标准说起,看下构造函数做了什么.
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
protected final void setState(int newState) {
// 将AQS的state赋值为这个传进来的值
state = newState;
}
构造函数就是初始化了AQS中的state值。所以这不在像之前的锁一样,state是0,而是传进来的这个值。
在来看await方法做了什么
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
//
return (getState() == 0) ? 1 : -1;
}
await方法实际上就是判断state值是否为0,如果为0,啥也不做,如果小于0,线程执行doAcquireSharedInterruptibly方法。所以我们上面主线程调用await方法的时候,只要其他4个线程没有执行完并且调用countDown方法,该state就不会为0,所以肯定会执行doAcquireSharedInterruptibly方法。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 构建队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
if (p == head) {
// 继续判断state是否为0,为0返回1,否则为-1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当前节点设置为头节点并且将节点状态设置为PROPAGATE状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 将前一个节点的waitstatus状态变为-1并且阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里大部分代码都是我们熟悉的,如果不熟悉的,看ReentrantLock源码分析
只有setHeadAndPropagate这个地方不太一样了。稍后我们在来看。
doAcquireSharedInterruptibly方法会继续尝试判断state是否为0,不为0当前线程将阻塞,也就是我们的主线就在此阻塞了。
这时候在来看countDown方法
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
这里主要就是tryReleaseShared方法决定,如果为true,执行doReleaseShared,否则什么也不做。
而tryReleaseShared就是获取state并且-1如果为0就会为true,否则就是false。所以这里为ture的时候只有第4个线程执行了countDown方法才会执行后面逻辑。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这里跟锁大致一样,唤醒头节点的下一个节点,也就是我们的主线程。所以主线程被唤醒了,继续在挂起的地方执行
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 构建队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
if (p == head) {
// 继续判断state是否为0,为0返回1,否则为-1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当前节点设置为头节点并且将节点状态设置为PROPAGATE状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 将前一个节点的waitstatus状态变为-1并且阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
主线程会执行setHeadAndPropagate,最终又执行到doReleaseShared方法。最终会走if (h == head)跳出了循环,主线程就会继续执行了下面的线程。
其实细心的朋友就会发现,调用countDown的线程也是走这个方法,那为什么只有h == head的时候才能退出循环,如果被唤醒的那个节点先执行了setHead方法,那么head就发生变动了,那么该线程又会循环,于是乎出现了多个线程同时会竞争去唤醒当前head节点的下一个节点,这让我感到很奇怪,为什么不直接让这个线程直接break,唤醒的工作就交给下一个节点就行了,让所有的countDown线程也去抢占释放head节点的下一个节点岂不是更消耗性能,所以一直没能明白这段代码的作用
else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
if (h == head)
希望有研究的朋友能讲解下
总结
CountDownLatch实际上就是阻塞线程直到达到了某个标准后,就会被唤醒继续执行。并且这个标准只有在初始化的时候被设置,而CyclicBarrier却有着更丰富的功能,而且这个标准可以reset重置。
网友评论