CyclicBarrier 的作用、应用场景和实战
CountDownLatch 的作用、应用场景和实战
概述
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
过程如下图所示:
Semaphore 常用方法
构造方法:
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
- permits 表示许可线程的数量
- fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
常用方法:
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
protected Collection<Thread> getQueuedThreads()
- acquire() 表示阻塞并获取许可
- tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
- release() 表示释放许可
- int availablePermits():返回此信号量中当前可用的许可证数。
- int getQueueLength():返回正在等待获取许可证的线程数。
- boolean hasQueuedThreads():是否有线程正在等待获取许可证。
- void reducePermit(int reduction):减少 reduction 个许可证
- Collection getQueuedThreads():返回所有等待获取许可证的线程集合
Semaphore 使用场景
可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求, 要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有 10 个,这时我们必须控制只有 10 个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用 Semaphore 来做流量控制。
代码示例:
/**
* 类说明:演示Semaphore用法,一个数据库连接池的实现
*/
public class DBPoolSemaphore {
private final static int POOL_SIZE = 10;
//两个指示器,分别表示池子还有可用连接和已用连接
private final Semaphore useful, useless;
//存放数据库连接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolSemaphore() {
this.useful = new Semaphore(10);
this.useless = new Semaphore(0);
}
/*归还连接*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库连接!!"
+ "可用连接数:" + useful.availablePermits());
useless.acquire();
synchronized (pool) {
pool.addLast(connection);
}
useful.release();
}
}
/*从池子拿连接*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
useless.release();
return connection;
}
}
/**
* 类说明:测试数据库连接池
*/
public class AppTest {
private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
private static class BusiThread extends Thread {
@Override
public void run() {
Random r = new Random();//让每个线程持有连接的时间不一样
long start = System.currentTimeMillis();
try {
Connection connect = dbPool.takeConnect();
System.out.println("Thread_" + Thread.currentThread().getId()
+ "_获取数据库连接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
SleepTools.ms(100 + r.nextInt(100));//模拟业务操作,线程持有连接查询数据
System.out.println("查询数据完成,归还连接!");
dbPool.returnConnect(connect);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread thread = new BusiThread();
thread.start();
}
}
}
打印结果:
Thread_12_获取数据库连接共耗时【0】ms.
Thread_14_获取数据库连接共耗时【0】ms.
Thread_17_获取数据库连接共耗时【0】ms.
Thread_15_获取数据库连接共耗时【0】ms.
Thread_18_获取数据库连接共耗时【0】ms.
Thread_20_获取数据库连接共耗时【0】ms.
Thread_13_获取数据库连接共耗时【0】ms.
Thread_16_获取数据库连接共耗时【0】ms.
Thread_11_获取数据库连接共耗时【0】ms.
Thread_19_获取数据库连接共耗时【0】ms.
查询数据完成,归还连接!
当前有5个线程等待数据库连接!!可用连接数:0
Thread_21_获取数据库连接共耗时【121】ms.
查询数据完成,归还连接!
当前有4个线程等待数据库连接!!可用连接数:0
Thread_22_获取数据库连接共耗时【133】ms.
查询数据完成,归还连接!
当前有3个线程等待数据库连接!!可用连接数:0
查询数据完成,归还连接!
当前有2个线程等待数据库连接!!可用连接数:0
Thread_23_获取数据库连接共耗时【145】ms.
Thread_24_获取数据库连接共耗时【144】ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有1个线程等待数据库连接!!可用连接数:0
当前有1个线程等待数据库连接!!可用连接数:0
Thread_25_获取数据库连接共耗时【158】ms.
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:1
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:2
当前有0个线程等待数据库连接!!可用连接数:2
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:4
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:5
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:6
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:7
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:8
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:9
从打印结果可以看出,一次只有 10 个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。
注意: Semaphore 只是对资源并发访问的线程数进行监控,并不会保证线程安全。
网友评论