作用
它允许一个或多个线程一直等待其他线程的执行完后再执行。
如何使用(demo)
1、先创建线程池
private static final int processors = Runtime.getRuntime().availableProcessors();
private static final ExecutorService executorService = Executors.newFixedThreadPool(processors, new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable runnable) {
String name = "name";
Thread thread = new Thread(runnable, name);
thread.setDaemon(true);
return thread;
}
});
2、建立任务
private class Task implements Callable<String> {
private final String data;
private final CountDownLatch latch;
public Task(String data, CountDownLatch latch) {
this.data = data;
this.latch = latch;
}
@Override
public String call() throws Exception {
try {
return transform(data);
} catch (Exception e) {
return null;
} finally {
// 这个非常重要,不管任务执行成功,失败,都要释放
latch.countDown();
}
}
}
3、执行任务
// Lists.newArrayList(); 用到guava ,初始化数据自己组织
List<String> strings = Lists.newArrayList();
List<Future<String>> futureList = Lists.newArrayList();
final CountDownLatch latch = new CountDownLatch(strings.size());
for (String string : strings) {
final Future<String> future = executorService.submit(new Task(string, latch) {
});
futureList.add(future);
}
try {
latch.await();
} catch (InterruptedException e) {
// log&monitor
}
List<String> resultList = Lists.newArrayList();
for (Future<String> future : futureList) {
try {
final String str = future.get(300, TimeUnit.MILLISECONDS);
if (str != null) {
resultList(str);
}
} catch (Exception e) {
// log&monitor
}
}
实现原理
实现的代码真的是很简单,其底层是由AbstractQueuedSynchronizer(java并发的核心类) 支持的
类存在一个内部类Sync,继承自AbstractQueuedSynchronizer,其源代码如下
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
网友评论