栅栏的作用是控制一组不同步的线程任务必须要在共同到达某个点的时候才能继续执行任务。比如:聚会吃饭,约定好人到齐才能开吃,先到的人在座位上等待,等人到齐了再动筷子,各吃各的。
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("----");
System.out.println(Thread.currentThread().getName());
System.out.println("----");
}
});
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.submit(new Task(barrier));
}
exec.shutdown();
}
public static class Task implements Runnable {
CyclicBarrier barrier;
public static volatile int count = 0;
public int id = 0;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
id = ++count;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " id : " + id + "等待");
barrier.await();
System.out.println(Thread.currentThread().getName() + " id : " + id + "完成");
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
pool-1-thread-1 id : 1等待
pool-1-thread-5 id : 5等待
pool-1-thread-4 id : 4等待
pool-1-thread-2 id : 2等待
pool-1-thread-3 id : 3等待
----
pool-1-thread-3
----
pool-1-thread-3 id : 3完成
pool-1-thread-1 id : 1完成
pool-1-thread-5 id : 5完成
pool-1-thread-4 id : 4完成
pool-1-thread-2 id : 2完成
解读:定义了一个栅栏,任务数有5个,构造中的Runable接口等到所有任务到齐之后执行,run()方法执行的线程取决于最后进入等待的那个线程。当barrier.await();方法调用任务阻塞等待,栅栏的任务数减一,当计数器减到0的时候各个线程被唤醒,继续执行完成任务。
源码简要分析:
- 构造:
public CyclicBarrier(int parties, Runnable barrierAction)
public CyclicBarrier(int parties)
有两个构造函数,最终都会调用两个参数的那个构造,第一个参数表示需要等待任务的个数,第二个为,所有任务到达那个点的时候将会调用run()方法。
- 属性
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
栅栏的实现还是基于可重入锁实现的,在里面有个count计数器,在我们调用barrier.await();的时候计数器减一,当计数器为0,上面示例代码中5个任务都到达的时候计数器会重置,可以重新使用。
接下来看来的源码中的关键方法:doawait();
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
- lock.lock();进入方法后最先调用的线程获取锁,其他任务如果再次进入则阻塞。
- 获取锁的任务继续执行,int index = --count;计数器减一,然后会判断是否为0,如果为0则所有任务到达了,则回调构造传进来的Runable对象的run()方法,如果还有任务未到达,则进入for(;;)死循环,调用
trip.await();释放锁,当前线程挂起,其他线程任务从lock.lock();获取锁,重复上述步骤,知道count为0。可以看到,command.run();方法在所有线程到达之后执行,并且执行的线程是最后获取锁的那个线程,和上述实例表现的一致。
网友评论