在J.U.C包中,提供了几个非常有用的并发工具类,通过使用这些工具类,可以有效提高并发编程中,并发流程的控制,以提升效率和代码质量,如下:
- CountDownLatch
- CyclicBarrier
- Semaphore
1. 等待多线程完成的CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。
在代码实现中,我们也可以使用join()方法,让当前执行线程等待join线程执行结束。join的实现原理是不断的去判断join的线程是否存活,如果存活,则让当前线程一直等待。代码如下:
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
//重点部分
//isAlive()方法时一个本地方法,可以查看jvm源码
while (isAlive()) {
wait(0);
}
} else {
//超时判断
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
在使用CountDownLatch时,需要通过构造函数传入一个int型的参数作为计数器,如下代码:
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
调用countDown()方法,会对计数器进行减一操作,当计数器减为0的时候,调用await方法时不会阻塞当前线程。同时CountDownLatch不能重新初始化或者修改CountDownLatch对象的内部计数器。
2. 同步屏障CyclicBarrier
让一组线程到达一个屏障点时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被拦截的线程才会继续运行。
1. 构造方法
public CyclicBarrier(int parties) {
this(parties, null);
}
通过构造方法,来确定需要拦截的线程数目(parties), 每个线程通过调用CyclicBarrier的await()方法,来告诉CyclicBarrier我已经到达屏障,然后当前线程被阻塞。
2. 应用场景
CyclicBarrier可以用于多线程数据计算,当每个计算线程结束之后,需要将计算结果合并。
3. CyclicBarrier与CountDownLatch的区别
- CyclicBarrier的计数器可以使用reset()进行重置,而CountDownLatch的计数器不可重置
- CyclicBarrier提供了比CountDownLatch更丰富的方法,如isBroken(),可以用于了解线程是否被中断。getNumberWaiting()方法可以获取被CyclicBarrier阻塞的线程数;
3. 控制并发线程数的Semaphore
Semaphore(信号量)用来控制同时访问特定资源的线程数量;
1. 构造函数
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
在构造函数中,可以传入两个参数:
- permits: 可用的许可证数量
- fair: 是否公平获取许可证
比如Semaphore(10,true),就表示允许10个线程获取许可证,也就是最大的并发量为10,线程可以通过公平竞争(即先进先出的顺序)的方式获取许可证;
2. 应用场景
比如有上完个数据文件,我们可以开启几十个线程去分析读取文件,将文件读取到内存中之后,还需要将分析过后的数据存储的数据库中,但是数据库允许的最大连接数是10个,所以,必须要控制只有10个线程可以获取到数据库连接。这个时候就可以用Semaphore来做流量的控制。
3. 其他的API
- public final boolean hasQueuedThreads(): 是否还有线程正在等待获取许可证
- public final int getQueueLength() :判断还有多少个等待获取许可证的线程
- public int availablePermits() :返回此信号量中当前可用的许可证数量
- public void acquire(): 从此信号量中请求一个许可证
- public void release(): 从此信号量中释放一个许可证
- public boolean tryAcquire(): 试图从信号量中请求一个许可证,无可用的许可证时,直接返回不阻塞;
网友评论