先看例子:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch count1 = new CountDownLatch(0);
CountDownLatch count2 = new CountDownLatch(1);
CountDownLatch count3 = new CountDownLatch(1);
Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A");
Thread threadB = new Thread(new CounRunable(count2,count3),"Thread-B");
Thread threadC = new Thread(new CounRunable(count3,count3),"Thread-C");
threadA.start();
threadB.start();
threadC.start();
Thread.sleep(1000);
}
}
CounRunable 线程的实现 如下:
public class CounRunable implements Runnable {
private CountDownLatch count1;
private CountDownLatch count2;
public CounRunable(CountDownLatch count1, CountDownLatch count2) {
this.count1 = count1;
this.count2 = count2;
}
@Override
public void run() {
try {
count1.await();
System.out.println(Thread.currentThread().getName());
count2.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果如下:
Thread-A
Thread-B
Thread-C
Process finished with exit code 0
以上代码 确实满足 线程的 顺序执行,那是怎么做到的呢?
首先:我们创建了 三个CountDownLatch
构造时 传入 的参数 分别 为:0、1、1,也就是说 :count1 、count2 、count3
持有的计算器 分别 为:0、1、1。那这个计数器有什么用呢?如下:
public static void main(String[] args) throws InterruptedException {
CountDownLatch count1 = new CountDownLatch(1);
CountDownLatch count2 = new CountDownLatch(1);
CountDownLatch count3 = new CountDownLatch(1);
Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A");
Thread threadB = new Thread(new CounRunable(count2,count3),"Thread-B");
Thread threadC = new Thread(new CounRunable(count3,count3),"Thread-C");
System.out.println(Thread.currentThread().getName());
threadA.start();
threadB.start();
threadC.start();
}
image.png执行结果如下:
发现 线程 阻塞住了,根据以上代码 发现 :count1,持有的计数器 变为了 1,找到
CountDownLatch
的构造方法:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
找到 Sync 代码实现:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
// 设置状态
setState(count);
}
int getCount() {
// 获取最新的状态
return getState();
}
protected int tryAcquireShared(int acquires) {
// 根据当前状态的值 返回不同的值
return (getState() == 0) ? 1 : -1;
}
// 尝试 将 state 设置为 0,
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
根据以上 代码 发现 Sync 为静态内部类 并继承了 AQS, 并重写了
tryAcquireShared 和 tryReleaseShared
,根据new CountDownLatch(1)`` 发现 参数 1 被传递到了
setState(1)``` 这个方法,找到 setState(1);
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* The synchronization state.
*/
private volatile int state;
发现 是父类
AbstractQueuedSynchronizer
即 AQS 中的一个 被volatile
修饰的变量(对 这个关键字不了解的 请查看 我相关文章解释)。也就是说先的state =1
,接着分析,代码的执=执行:当 count1 、 count2、 count13 分别构造完成后,state 分别为 1、1、1;线程创建完成后,分别开始启动线程 Thread-A、Thread-B、Threa-C,并执行对应的 run() 方法。
public void run() {
try {
count1.await();
System.out.println(Thread.currentThread().getName());
count2.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
根据
Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A");
可知调用 count1 的 wait() 方法,根据 以上执行结果可知,Thread-A 被阻塞了, 找到 wait() 的实现:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
发现调用了 内部类 sync 的 acquireSharedInterruptibly(1) 方法。找到 具体实现,发现调用的是父类 AQS 的实现:
public final void acquireSharedInterruptibly(int 1) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
分析 以上代码,如果 线程被中断了 则直接抛出异常,否则会继续往下执行,找到
tryAcquireShared(1)
具体实现如下(CountDownLatch
自己实现了,且 AQS 并不提供默认的实现):
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
跟据以上代码,acquires =1 state = 1, 也就是说 该方法的 返回值为 -1,
tryAcquireShared(1) < 0
则为true
, 则继续执行doAcquireSharedInterruptibly(1)
。找到 对应的实现:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
分析以上代码:添加一个Node 节点到队列中,然后开始 自旋,然后判断 添加的Node节点 的 是否为 head 节点,如果当前的节点为 head 节点 则 执行
tryAcquireShared(1)
或去 state =1 即 r =-1,不满足 if (r >= 0),则 继续自旋,当然线程 Thread-A 也就被阻塞在了这里。那怎么跳出自旋呢?发现 当 满足 r>=0 时,也就是 state = 0 时。怎样 才能 让 state = 0 呢? 构造时候 传入 0,或者调用 countDown(); 方法:
public void countDown() {
sync.releaseShared(1);
}
找到 对应的实现:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
分析以上代码:通过自旋 判断 state 是否 为 0,如果为0,返回 false ,否则 int nextx = c -1 ; 并通过CAS 设置 state 如果设置成功 并判断 nextx 是否为 0 也是 state 是否为0;如果 state= 0则 返回true 则 执行
doReleaseShared()
实现如下:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
分析以上代码:判断当前 head 节点和 tail 节点是否为head 节点, 如果条件满足,则会判断 head 节点的 waitStae 是否为 -1,如果为 -1 则 执行 unparkSuccessor(h) 唤醒线程。
网友评论