1.简介
CyclicBarriers是Java 5作为java.util.concurrent包的一部分引入的同步构造。
在本文中,我们将在并发方案中探索此实现。
2. Java并发 - 同步器
该的java.util.concurrent包中包含几类,以帮助管理的一组互相协作的线程。其中一些包括:
- CyclicBarrier
- Phaser
- CountDownLatch
- Exchanger
- Semaphore
- SynchronousQueue
这些类为线程之间的常见交互模式提供了开箱即用的功能。如果我们有一组线程相互通信并遵循一个或多个更常见的线程
如果我们有一组相互通信并且类似于常见模式之一的线程,我们可以简单地重用相应的库类(也称为Synchronizers),而不是尝试使用一组锁和条件对象和synchronized关键字来提出自定义方案。。
3. CyclicBarrier
CyclicBarrier是一个同步器,它允许一组线程等待彼此达成共同执行点,也被称为barrier。
CyclicBarriers用于程序中,在这些程序中我们有固定数量的线程,在继续执行之前必须等待彼此到达公共点。
barrier称为循环,因为它可以在等待线程释放后重新使用。
4.用法
CyclicBarrier的构造函数很简单。它需要一个整数,表示需要在barrier实例上调用await()方法以表示到达公共执行点的线程数:
public CyclicBarrier(int parties)
需要同步其执行的线程也称为parties ,并且调用await()方法是我们如何注册某个线程已达到障碍点。
此调用是同步的,调用此方法的线程会暂停执行,直到指定数量的线程在屏障上调用相同的方法。所需线程数已调用await()的情况称为跳越障碍。
或者,我们可以将第二个参数传递给构造函数,该构造函数是一个Runnable实例。这个逻辑将由跳过障碍的最后一个线程运行:
public CyclicBarrier(int parties, Runnable barrierAction)
5.实例化
要查看CyclicBarrier的运行情况,让我们考虑以下情况:
有一个操作是固定数量的线程执行并将相应的结果存储在列表中。当所有线程完成执行其操作时,其中一个(通常是跳过屏障的最后一个)开始处理每个线程获取的数据。
让我们实现所有操作发生的主类:
public class CyclicBarrierDemo {
private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults
= Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;
// ...
}
这个类非常简单 - NUM_WORKERS是要执行的线程数,NUM_PARTIAL_RESULTS是每个工作线程将要生成的结果数。
最后,我们有partialResults,它是一个列表,用于存储每个工作线程的结果。请注意,此列表是SynchronizedList,因为多个线程将同时写入它,并且add()方法在普通ArrayList上不是线程安全的。
现在让我们实现每个工作线程的逻辑:
public class CyclicBarrierDemo {
// ...
class NumberCruncherThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();
// Crunch some numbers and store the partial result
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}
partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}
}
我们现在将实现屏障跳闸时运行的逻辑。
为了简单起见,我们只需添加部分结果列表中的所有数字:
public class CyclicBarrierDemo {
// ...
class AggregatorThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
System.out.println(
thisThreadName + ": Computing sum of " + NUM_WORKERS
+ " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;
for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
}
}
}
最后一步是构造CyclicBarrier并使用main()方法启动:
public class CyclicBarrierDemo {
// Previous code
public void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;
cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
System.out.println("Spawning " + NUM_WORKERS
+ " worker threads to compute "
+ NUM_PARTIAL_RESULTS + " partial results each");
for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.runSimulation(5, 3);
}
}
在上面的代码中,我们使用5个线程初始化循环屏障,每个线程产生3个整数作为其计算的一部分,并将其存储在结果列表中。
一旦屏障被触发,跳过屏障的最后一个线程执行AggregatorThread中指定的逻辑,即 - 添加线程产生的所有数字。
6.结果
以下是上述程序执行的输出 - 每次执行都可能会产生不同的结果,因为线程可以以不同的顺序生成:
Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2
Adding 2 0 5
Adding 6 4 0
Adding 1 1 0
Adding 9 3 5
Thread 4: Final result = 46
如上面的输出所示,线程4是跳过屏障并且还执行最终聚合逻辑的线程。如上例所示,线程实际上也不必按照它们启动的顺序运行。
7.结论
在本文中,我们看到了CyclicBarrier是什么,以及它有什么样的情况。
我们还实现了一个场景,在继续使用其他程序逻辑之前,我们需要固定数量的线程才能到达固定的执行点。
imageimage欢迎大家关注公众号:「Java知己」,关注公众号,回复「1024」你懂得,免费领取 30 本经典编程书籍。关注我,与 10 万程序员一起进步。 每天更新Java知识哦,期待你的到来!
网友评论