连接池:即线程池
要自定义先要了解线程池模型,即线程池的核心参数
1.coresize核心线程池,即运行的线程
2.maxsize最大线程数,即线程池的容量
3.ThreadFactory线程池工程,但在连接池中是很多connect 即实现autoclosed的连接
- queue 消息队列 , 当加入的任务过多线程无法及时处理时,将任务加入队列等待
- maxTime 即一个线程允许相应的最大时长,超出自动中断
- handle 超出应对策略:即如果线程池的队列满了,还有请求进来该如何处理
关于为啥不用java中excutors直接构建线程池的问题:
核心的问题:
内存溢出;
为什么会出现内存溢出:
情况1:maxInt的最大线程数 如cachePool
情况2:maxInt的消息队列 如fixPool的blockLinkQueue
为了防止这些问题,所以希望通过基础方式合理创建
附上大神neo的代码
public class Pool<T extends AutoCloseable> {
final BlockingDeque<PoolItem<T>> idleItems = new LinkedBlockingDeque<>();
final String name;
final AtomicInteger size = new AtomicInteger(0);
private final Logger logger = LoggerFactory.getLogger(Pool.class);
private final Supplier<T> factory;
public Duration maxIdleTime = Duration.ofMinutes(30);
private int minSize = 1;
private int maxSize = 50;
private long checkoutTimeoutInMs = Duration.ofSeconds(30).toMillis();
public Pool(Supplier<T> factory, String name) {
this.factory = factory;
this.name = name;
}
public void size(int minSize, int maxSize) {
this.minSize = minSize;
this.maxSize = maxSize;
}
public void checkoutTimeout(Duration timeout) {
checkoutTimeoutInMs = timeout.toMillis();
}
public PoolItem<T> borrowItem() {
PoolItem<T> item = idleItems.poll();
if (item != null) return item;
if (size.get() < maxSize) {
return createNewItem();
} else {
return waitNextAvailableItem();
}
}
public void returnItem(PoolItem<T> item) {
if (item.broken) {
closeResource(item.resource);
} else {
item.returnTime = System.currentTimeMillis();
idleItems.push(item);
}
}
private PoolItem<T> waitNextAvailableItem() {
var watch = new StopWatch();
try {
PoolItem<T> item = idleItems.poll(checkoutTimeoutInMs, TimeUnit.MILLISECONDS);
if (item == null) throw new PoolException("timeout to wait for next available resource", "POOL_TIME_OUT");
return item;
} catch (InterruptedException e) {
throw new Error("interrupted during waiting for next available resource", e);
} finally {
logger.debug("wait for next available resource, pool={}, elapsed={}", name, watch.elapsed());
}
}
private PoolItem<T> createNewItem() {
var watch = new StopWatch();
size.incrementAndGet();
try {
return new PoolItem<>(factory.get());
} catch (Throwable e) {
size.getAndDecrement();
throw e;
} finally {
logger.debug("create new resource, pool={}, elapsed={}", name, watch.elapsed());
}
}
public void refresh() {
logger.info("refresh resource pool, pool={}", name);
recycleIdleItems();
replenish();
}
int activeCount() {
return totalCount() - idleItems.size();
}
int totalCount() {
return size.get();
}
private void recycleIdleItems() {
Iterator<PoolItem<T>> iterator = idleItems.descendingIterator();
long maxIdleTimeInMs = maxIdleTime.toMillis();
long now = System.currentTimeMillis();
while (iterator.hasNext()) {
PoolItem<T> item = iterator.next();
if (now - item.returnTime >= maxIdleTimeInMs) {
boolean removed = idleItems.remove(item);
if (!removed) return;
closeResource(item.resource);
} else {
return;
}
}
}
private void replenish() {
while (size.get() < minSize) {
returnItem(createNewItem());
}
}
private void closeResource(T resource) {
size.decrementAndGet();
try {
resource.close();
} catch (Exception e) {
logger.warn("failed to close resource, pool={}", name, e);
}
}
public void close() {
size.set(maxSize); // make sure no more new resource will be created
while (true) {
PoolItem<T> item = idleItems.poll();
if (item == null) return;
closeResource(item.resource);
}
}
}
网友评论