一、一个例子
private static void countDownLatchTest(List<String> roles){
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i=0;i<10;i++){
CountRunnable countRunnable = new CountRunnable(i+"",i+"",roles,countDownLatch);
new Thread(countRunnable).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class CountRunnable implements Runnable{
private String id;
private String name;
private List<String> roles;
private CountDownLatch countDownLatch;
private String password;
public CountRunnable(String id, String name, List<String> roles,
CountDownLatch countDownLatch){
this.name = name;
this.id = id;
this.roles = roles;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
roles.add(name);
System.out.println("执行了-id:"+id+"name:"+name+"roles:"+roles.size());
try {
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<String> getRoles() {
return roles;
}
public void setRoles(List<String> roles) {
this.roles = roles;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
输出结果:
开始:1515333020630
执行了-id:0name:0roles:1
执行了-id:1name:1roles:2
执行了-id:4name:4roles:4
执行了-id:8name:8roles:7
执行了-id:3name:3roles:9
执行了-id:7name:7roles:10
执行了-id:9name:9roles:8
执行了-id:6name:6roles:6
执行了-id:5name:5roles:5
执行了-id:2name:2roles:4
[0, 1, 2, 4, 5, 6, 8, 9, 3, 7]
结束:1515333021639耗时:1009roles size:10
可以看出,上面每个线程执行1000ms,总时长1009ms.
二、源码介绍
/**
- Constructs a {@code CountDownLatch} initialized with the given count.
- @param count the number of times {@link #countDown} must be invoked
before threads can pass through {@link #await}
- @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
这里看清楚一点:构造器中要指定大于0的参数。
然后创建了一个sync对象,这个Sync是一个内部类:
/**
-
Synchronization control For CountDownLatch.
-
Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {
setState(count);
}int getCount() {
return getState();
}protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
可以看到,Sync继承自AQS类的内部类,是CountDownLatch的核心实现。在内部类里有几个方法:
getCount:获取当前等待的线程数
tryAcquireShared:如果线程数为0才返回1,即当前没有等待的线程
tryReleaseShared:重写AQS方法,实现每被countDown调用,就将标志位-1,直到标志位为0。
而CountDownLatch的里的几个方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
acquireSharedInterruptibly:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判断是否发生中断
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//注意:-1表示获取到了共享锁,1表示没有获取共享锁
doAcquireSharedInterruptibly(arg);
}
继续深入doAcquireSharedInterruptibly,这段我是看不明白了:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//由于采用的公平锁,所以要将节点放到队列里
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {//本质是等待共享锁的释放
final Node p = node.predecessor();//获得节点的前继
if (p == head) { //如果前一个节点等于前继
int r = tryAcquireShared(arg);//就判断尝试获取锁
/*
这里要注意一下r的值就2种情况-1和1:
情况1.r为-1,latch没有调用countDown(),state是没有变化的导致state一直大于0或者调用了countDown(),但是state不等于0,直接在for循环中等待
情况2.r为1,证明countDown(),已经减到0,当前线程还在队列中,state已经等于0了.接下来就是唤醒队列中的节点
*/
if (r >= 0) {
setHeadAndPropagate(node, r);//将当前节点设置头结点。
p.next = null; // help GC 删除旧的头结点
failed = false;
return;
}
}
//当前节点不是头结点,当前线程一直等待,直到获取到共享锁。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录头结点
setHead(node);//设置node为头结点
/*
*从上面传递过来 propagate = 1;
*一定会进入下面的判断
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;//获得当前节点的下一个节点,如果为最后一个节点或者,为shared
if (s == null || s.isShared())
doReleaseShared();//释放共享锁
}
}
isShared:
final boolean isShared() {
return nextWaiter == SHARED;
}
释放共享锁,通知后面的节点:
private void doReleaseShared() {
for (;;) {
Node h = head;//获得头结点
if (h != null && h != tail) {
int ws = h.waitStatus;//获取头结点的状态默认值为0
if (ws == Node.SIGNAL) {如果等于SIGNAL唤醒状态
//将头结点的状态置成0,并使用Node.SIGNAL(-1)与0比较,continue,h的状态设置为0,不会再进入if (ws == Node.SIGNAL)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}//判断ws是否为0,并且h的状态不等于0,这里是个坑啊,ws等于0,h就是0啊,所以if进不来的,并设置节点为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
如果头结点等于h,其实也没有一直loop,由于上面写的Node h = head,就算前面的条件都不满足,这里一定会break
if (h == head) // loop if head changed
break;
}
}
深入到这里,以超出我的想象力。
三、CountDownLatch总结
CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count个线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行。
image.png
网友评论