一. CountDownLatch
用于让当前线程等待其他线程完成操作。
- CountDownLatch的构造函数接收一个int类型的参数作为计数器;
- CountDownLatch的
countDown()
方法用于计数器减一; - CountDownLatch的
await()
方法会阻塞当前线程,直到计数器变为零。
类似的,thread.join()
方法也可以让当前线程阻塞,直到调用join()方法的线程执行完成。
超时等待:
- join(long millis)、join(long millis, int nanos)
- await(long time, TimeUnit unit)
await(long time, TimeUnit unit)
使当前线程等待一段时间,超时时间到后当前线程就继续执行后面的代码,不再等待那些未完成的线程了。
二. CyclicBarrier
同步屏障。
用于让一组线程到达一个屏障(也可叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障阻塞的线程才会继续运行。
- 利用构造函数
CyclicBarrier(int parties)
构造同步屏障,其参数表示屏障要拦截的线程数; - 各线程调用
await()
方法告诉同步屏障自己已经到达同步点,同时自己被阻塞; - 当调用
await()
方法的线程数达到设定的值时,屏障解除,各线程继续执行。
更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction)
,用于在最后一个线程达到屏障,使屏障打开时,优先执行barrierAction,然后再继续运行之前被阻塞的线程。
CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置;
- CyclicBarrier还提供了其他有用的方法,如getNumberWaiting()方法可获取CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。
三. Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,如数据库连接数。
- 通过构造函数
Semaphore(int permits)
构造,其中参数表示可用的许可证数量; - 线程使用
semaphore.acquire()
获取一个许可证; - 使用完后,线程再使用
semaphore.release()
归还一个许可证; - 只有当剩余许可证数大于零时,线程才能获取到许可证,否则只能等待。
四. Exchanger
Exchanger(交换器)是一个用于线程间协作的工具类,可用于进行线程间的数据交换。
- Exchanger在exchange()方法处设置一个同步点;
- 第一个线程执行到exchange()方法处(到达同步点),会一直等待,直到第二个线程也执行到exchange()方法处(到达同步点);
- 两个线程都到达同步点后,两个线程交换数据。
public class test {
private static final Exchanger<String> exchanger = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String a = "aaa";
System.out.println("begin-a: " + a);
String a1 = exchanger.exchange(a);
System.out.println("exchange-a: " + a);
System.out.println("exchange-a1: " + a1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String b = "bbb";
System.out.println("begin-b: " + b);
String b1 = exchanger.exchange(b);
System.out.println("exchange-b: " + b);
System.out.println("exchange-b1: " + b1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.shutdown();
}
}
运行结果:
begin-a: aaa
begin-b: bbb
exchange-b: bbb
exchange-b1: aaa
exchange-a: aaa
exchange-a1: bbb
网友评论