连接池主要包括两块功能
1.连接池自身管理,由ConnPoolControl<T>定义
2.核心功能,租用连接,归还连接,由ConnPool<T, E>接口定义
![](https://img.haomeiwen.com/i4880910/c4882b1e5c0efff8.png)
连接池自身管理
- 设置连接池的最大连接数、每个ip最大连接数
- 查询功能
查看连接池的最大连接数、已经被租用的连接数据、可使用的连接数据、有多少等待者在等待拿连接
查看某个ip的最大连接数、已经被租用的连接数据、可使用的连接数据、有多少等待者在等待拿连接
此处解释下MaxtTotal和DefaultMaxPerRoute的区别:
- MaxtTotal是整个池子的大小;
- DefaultMaxPerRoute是根据连接到的主机对MaxTotal的一个细分;比如:
MaxtTotal=400 DefaultMaxPerRoute=200而我只连接到http://sishuok.com 时,到这个主机的并发最多只有200;而不是400;
而我连接到http://sishuok.com 和 http://qq.com 时,到每个主机的并发最多只有200;即加起来是400(但不能超过400);所以起作用的设置是DefaultMaxPerRoute。
连接池核心功能
首先看从连接池获取连接的流程
![](https://img.haomeiwen.com/i4880910/c9b606887d94a044.png)
获取连接和返还连接都是线程安全的,通过Lock和Condition配合完成。关键代码参考
AbstractConnPool、PoolEntryFuture
实现。在高并发情况下,假如线程获取不到连接,那么会被缓存在内存LinkedList,如果释放不及时,则很有可能出现OOM的情况
为了便于理解,我自己简单实现了CPool和PoolEntryFuture代码
public class CPool {
private static AtomicInteger atomicInteger = new AtomicInteger(0);
//非公平锁
private static Lock lock = new ReentrantLock();
private final LinkedList<PoolEntryFuture> pending = new LinkedList<>();
public CPoolEntry lease(String url) {
lock.lock();
try {
boolean canLease = atomicInteger.get() <= 5;
if (canLease) {
atomicInteger.incrementAndGet();
return new CPoolEntry();
} else {
PoolEntryFuture poolEntryFuture = new PoolEntryFuture(lock);
pending.add(poolEntryFuture);
poolEntryFuture.waitUntilFree();
atomicInteger.incrementAndGet();
return poolEntryFuture.getcPoolEntry();
}
} finally {
lock.unlock();
}
}
public void release(CPoolEntry cPoolEntry) {
lock.lock();
try {
System.out.println("当前已使用连接数:" + atomicInteger.decrementAndGet());
if (pending.size() > 0) {
pending.poll().wakeup();
}
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
System.out.println();
CPool cPool = new CPool();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 30; i++) {
executorService.execute(new Executor(cPool));
}
}
}
public class PoolEntryFuture {
private Condition condition;
private Lock lock;
private CPoolEntry cPoolEntry;
public PoolEntryFuture(Lock lock) {
this.lock = lock;
this.condition = lock.newCondition();
}
public CPoolEntry getcPoolEntry() {
return cPoolEntry;
}
public void setcPoolEntry(CPoolEntry cPoolEntry) {
this.cPoolEntry = cPoolEntry;
}
public void waitUntilFree() {
this.lock.lock();
try {
try {
//一直等待,等待到
this.condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
this.lock.unlock();
}
}
// 唤醒等待的线程,让他去拿连接了
public void wakeup() {
this.lock.lock();
try {
System.out.println("唤醒了:" + Thread.currentThread().getName());
this.condition.signalAll();
} finally {
this.lock.unlock();
}
}
}
关闭空闲连接
http连接池可以设置守护线程去关闭连接,最终调用的也是Socket.close();
。 IdleConnectionEvictor
满足下面两个条件会被处初始化,并在构造函数中创建一个守护线程,默认5s跑一次(这个时间可以设置
)去清理过期和闲置的连接。
守护线程开启需满足下面两个条件
1.连接池的控制不被共享,connManagerShared==false
2.并且evictExpiredConnections
开启过期连接清除,或者设置过多少时间清理连接参数 evictIdleConnections
List<Closeable> closeablesCopy = closeables != null ? new ArrayList<Closeable>(closeables) : null;
if (!this.connManagerShared) {
if (closeablesCopy == null) {
closeablesCopy = new ArrayList<Closeable>(1);
}
final HttpClientConnectionManager cm = connManagerCopy;
if (evictExpiredConnections || evictIdleConnections) {
final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS);
closeablesCopy.add(new Closeable() {
@Override
public void close() throws IOException {
connectionEvictor.shutdown();
}
});
connectionEvictor.start();
}
closeablesCopy.add(new Closeable() {
@Override
public void close() throws IOException {
cm.shutdown();
}
});
}
网友评论