实例代码
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
//规定几步
CountDownLatch countDownLatch = new CountDownLatch(3);//构造函数
executorService.submit(()->{
System.out.println("饭煮好了");
countDownLatch.countDown();
});
executorService.submit(()->{
System.out.println("菜炒好了");
countDownLatch.countDown();
});
executorService.submit(()->{
System.out.println("汤煮好了");
countDownLatch.countDown();
});
countDownLatch.await();
//只有上面规定几步都执行完成了才会执行下面语句,否则继续等待
System.out.println("出来吃饭了");
}
构造函数new CountDownLatch(3)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync代码继承AQS
private static final class Sync extends AbstractQueuedSynchronizer {
//将AQS的state设置为count
Sync(int count) {
setState(count);
}
}
countDownLatch.countDown()方法
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//判断是否达到临界点,能否释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//主要是利用cas获取
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/*
* 释放锁
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
countDownLatch.await();
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//判断state是否大于0,如果其他的countdown方法已经执行完了这边会返回1,这个时候这边不会进入该方法,也不会中断线程,直接执行下面的业务逻辑
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加一个waiter,设置模式为SHARED(包含初始化waiter队列,可参考AQS文章)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//找到头结点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//如果state为0则返回1(getState() == 0) ? 1 : -1
if (r >= 0) {
//当state为0,则表示waiter可以执行了
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//停止该线程,唤醒线程也是这边唤醒的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
网友评论