一.什么是CyclicBarrier?
CyclicBarrier就是循环屏障
二.CyclicBarrier的用途?
n个线程相互等待,直到所有线程到达同一个点后,再同时执行
三.CyclicBarrier的应用场景
一起去做某件事,才能完成某件事
四.Cyclicbarrier的测试代码
import java.util.concurrent.*;
/**
* @author Aaron
* @date 2018/10/22 下午3:10
* @function 测试CyclicBarrier怎样使用
*/
public class TestCyclicBarrier {
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4,10,60, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
private static final CyclicBarrier cb = new CyclicBarrier(4, new Runnable() {
@Override
//唤醒所有线程后,第一个执行的线程
public void run() {
System.out.println("寝室四兄弟准备一起出发去球场");
}
});
private static class GoThread extends Thread{
private final String name;
public GoThread(String name){
this.name = name;
}
public void run(){
System.out.println(name+"开始从宿舍出发");
try {
Thread.sleep(1000);
cb.await();//拦截线程
System.out.println(name+"从楼底下出发");
Thread.sleep(1000);
System.out.println(name+"到达操场");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
test2();
}
public static void test1(){
String[] str = {"小汤","小赵","小高","小吴"};
for (int i=0;i<4;i++){
threadPool.execute(new GoThread(str[i]));
}
try {
Thread.sleep(4000);
System.out.println("四个人一起到达球场,现在开始打球");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void test2(){
String[] str = {"小汤","小高","小赵","小吴"};
String[] str1 = {"大汤","大高","大赵","大吴"};
for (int i=0;i<4;i++){
threadPool.execute(new GoThread(str[i]));
}
try {
Thread.sleep(4000);
System.out.println("四个人一起到达球场,现在开始打球");
System.out.println("现在对CyclicBarrier进行复用。。。。。。");
System.out.println("又来了一拨人,看看愿不愿意一起打");
} catch (InterruptedException e) {
e.printStackTrace();
}
//进行复用
for (int i=0;i<4;i++){
threadPool.execute(new GoThread(str1[i]));
}
try {
Thread.sleep(4000);
System.out.println("四个人一起到达球场,表示愿意一起打球,现在八个人开始打球");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
五.CyclicBarrier的源代码解析
1.内部类
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
这个Generation类只定义了一个布尔变量
private static class Generation {
boolean broken = false;
}
2.CyclicBarrier的核心代码
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//获取独占锁
lock.lock();
try {
//获取当前的generation
final Generation g = generation;
//假如当前的generation被损坏了,就抛出异常
if (g.broken)
throw new BrokenBarrierException();
//如果当前线程被中断,调用breakBarrier中断CyclicBarrier,并唤醒所有等待的线程
if (Thread.interrupted()) {
breakBarrier();//见下一个方法解析
throw new InterruptedException();
}
//将计数器减1
int index = --count;
//当index等于0时,就意味着parties个线程到达了屏障barrier
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();//唤醒所有线程
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//如果有parties个线程到达屏障后,或者线程被中断,或者超过最大等待时间,那么就会唤醒线程
for (;;) {
try {
//判断调用的方法是“非超时等待”还是“超时等待”
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
3.breakBarrier方法
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
1.终止线程,并且重新设置count的值为parties
2.唤醒所有线程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
4.nextGeneration方法
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
1、唤醒所有线程
2、重新设置count的值为parties
3、生层新的一代
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
5.await()方法(本质是调用了doawait方法)
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
六、CyclicBarrier和CountDownLatch的区别
1、CyclicBarrier的屏障可以循环使用;但是CountDownLatch不能循环使用
2、CyclicBarrier是n个线程互相等待;CountDownLatch是m(一个或多个)等待n个
3、CyclicBarrirt可以使用CountDownLatch实现,代码如下
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Aaron
* @date 2018/10/22 下午3:39
* @function 用CountDownLatch实现CyclicBarrier
*/
public class TestCyclicBarrierWithCDL {
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(4);
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(4,10,60, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
private static class GoHead extends Thread{
private final String name;
public GoHead(String name){
this.name = name;
}
public void run(){
System.out.println(name+"开始出发");
COUNT_DOWN_LATCH.countDown();
try {
Thread.sleep(1000);
COUNT_DOWN_LATCH.await();
System.out.println(name+"从楼底下出发");
Thread.sleep(1000);
System.out.println(name+"到达操场");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String[] str = {"A","B","C","D"};
for (int i=0;i<4;i++){
THREAD_POOL_EXECUTOR.execute(new GoHead(str[i]));
}
try {
Thread.sleep(4000);
System.out.println("四个人一起到达球场,现在开始打球");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
网友评论