引言
jdk1.5之后,java concurrent包提供了一些并发工具类。本文将梳理这些工具类的用法与使用场景。
- CountDownLatch:一个或多个线程阻塞等待,直到另外一批线程完成特定操作。
- CyclicBarrier:多个线程都会阻塞等待,直到所有线程都到达障碍点(barrier),功能上与CountDownLatch比较接近,他们最大区别是CountDownLatch的倒计时只能生效一次,CyclicBarrier可以循环使用。
- Exchanger:针对两个线程的同步器,允许这对线程交换数据。
CountDownLatch用法
初始化CountDownLatch需要指定具体的count值。await方法会阻塞线程,直到count值为0,调用countDown方法可以减少count值。
CountDownLatch的一种典型用法是在主线程中发起工作线程的执行,并等待工作线程的执行结束。
public class CountDownLatch1 {
public static void main(String[] args) {
try {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(5);
for (int i = 0; i < 5; ++i) {
new Thread(new Worker(startSignal, doneSignal)).start();
}
// 发起开始命令
System.out.println("工作线程开始执行...");
startSignal.countDown();
// 执行其他任务
System.out.println("执行其他流程...");
// 等待作业线程结束
doneSignal.await();
System.out.println("工作线程结束...");
} catch (InterruptedException ex) {
}
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
startSignal.await();
// 模拟作业
Thread.sleep(100);
} catch (InterruptedException ex) {
} finally {
System.out.println(Thread.currentThread().getName() + "执行结束");
doneSignal.countDown();
}
}
}
工作线程开始执行...
执行其他流程...
Thread-1执行结束
Thread-3执行结束
Thread-2执行结束
Thread-4执行结束
Thread-0执行结束
工作线程结束...
另一种用法是将一个大任务切分成多个子任务,每个子任务在一个新线程执行,所有子线程都执行结束就意味着大任务的执行结束。
这种方式非常适用于分治法的实现,各个子任务之间没有依赖关系。
public class CountDownLatch2 {
public static void main(String[] args) {
try {
List<Integer> numberList = Arrays.asList(1,2,3,4,5);
CountDownLatch doneSignal = new CountDownLatch(5);
Executor e = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; ++i) {
e.execute(new WorkerRunnable(doneSignal, i, numberList));
}
doneSignal.await();
System.out.println("工作线程执行结束");
} catch (InterruptedException ex) {
}
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
private final List<Integer> numberList;
WorkerRunnable(CountDownLatch doneSignal, int i, List<Integer> numberList) {
this.doneSignal = doneSignal;
this.i = i;
this.numberList = numberList;
}
@Override
public void run() {
doWork(i);
doneSignal.countDown();
}
void doWork(int i) {
System.out.println(Thread.currentThread().getName() + "打印" + numberList.get(i));
}
}
pool-1-thread-1打印1
pool-1-thread-5打印5
pool-1-thread-4打印4
pool-1-thread-3打印3
pool-1-thread-2打印2
工作线程执行结束
CyclicBarrier用法
CyclicBarrier非常适用于有一批线程且相互之间需要等待的场景。CyclicBarrier从字面理解是指循环屏障,这里的屏障意思是说线程调用await方法导致阻塞等待。
CyclicBarrier有两个构造器方法:
- CyclicBarrier(int parties):parties指定了在这个屏障上,需要有多少个线程调用await方法到达屏障。
- CyclicBarrier(int parties, Runnable barrierAction):这个构造方法比前一个多了
Runnable类型的参数,当最后一个线程到达屏障后就会执行这个线程任务。通过这个线程任务可以在所有线程都到达时完成一些诸如状态修改的功能。
class Solver {
final int N; //矩阵的行数
final float[][] data; //要处理的矩阵
final CyclicBarrier barrier; //循环屏障
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow); //处理指定一行数据
try {
barrier.await(); //在屏障处等待直到
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
//初始化CyclicBarrier
barrier = new CyclicBarrier(N, new Runnable() {
public void run() {
mergeRows(); //合并行
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
Exchanger用法
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger();
Thread thread1 = new Thread(new Worker(exchanger, "thread1"));
Thread thread2 = new Thread(new Worker(exchanger, "thread2"));
thread1.setName("thread1");
thread2.setName("thread2");
thread1.start();
thread2.start();
}
}
class Worker implements Runnable {
private String value;
private final Exchanger exchanger;
public Worker(Exchanger exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(value));
} catch (InterruptedException ex) {
}
}
}
网友评论