它是为了实现比 LinkedBlockingQueue 和 LinkedTransferQueue 更高的性能而特别定制的,根据连接池的特殊场景做了一些性能优化
容器类定义
public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {
// 底层存储 CopyOnWriteArrayList,可以无锁安全的读取连接池的信息
private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals; //默认是false
// 线程隔离的 FastList<PoolEntry>
private final ThreadLocal<List<Object>> threadList;
// 其实就是HikariPool,用来新建PoolEntry
private final IBagStateListener listener;
// 等待获取db连接的线程个数
private final AtomicInteger waiters;
private volatile boolean closed;
private final SynchronousQueue<T> handoffQueue;
//..
}
容器存储对象(条目)的定义
// 容器存放的的 IConcurrentBagEntry 接口实现类 PoolEntry
public interface IConcurrentBagEntry
{
// 空闲状态
int STATE_NOT_IN_USE = 0;
// 连接在使用
int STATE_IN_USE = 1;
// remove的时候先标记为被移除
int STATE_REMOVED = -1;
// 被预留状态,不可用但是可以移除
// 主要在检查线程中,要对连接进行softEvictConnection,确保能从not-in-use转到 reserved
int STATE_RESERVED = -2;
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
它的几种状态扭转
image.png连接池容器初始化
public ConcurrentBag(final IBagStateListener listener)
{
// 其实就是HikariPool, 负责创建PoolEntry
this.listener = listener;
// false
this.weakThreadLocals = useWeakThreadLocals();
this.handoffQueue = new SynchronousQueue<>(true);
// 等待获取db连接的线程个数
this.waiters = new **AtomicInteger**();
//底层存储 CopyOnWriteArrayList
this.sharedList = new **CopyOnWriteArrayList**<>();
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
}
else {
// ThreadLocal, 使用自定义的FastList
this.threadList = ThreadLocal.withInitial(() -> new **FastList**<>(IConcurrentBagEntry.class, 16));
}
}
容器操作 crud
- 从容器获取对象
borrow(long timeout, TimeUnit timeunit)
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
// **1.** 先从 threadlocal的FastList中取,倒序遍历, 同一个线程归还连接后再获取最新的连接 可避免一些检查
// 如果能取到就直接返回 这样无需锁
final List<Object> list = threadList.get();
for (int i = **list.size() - 1;** i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// **2.** threadlocal获取不到,则需要从底层的sharedList中去寻找
// 累加等待计数
// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
// 如果连接池里边无空闲连接了,则需要HikariPool去增加连接
listener.addBagItem(waiting);
// 进入超时等待循环
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
// 扣减等待计数
waiters.decrementAndGet();
}
- 新增对象 add(final T bagEntry)
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !**handoffQueue.offer(bagEntry))** {
Thread.yield();
}
}
将对象塞到底层 copyOnWriteArrayList里,如果有线程在等待连接,会把新增的这个 bagEntry塞到 handoffQueue,给阻塞在 borrow() 超时等待的线程
- 归还对象 requite(final T bagEntry)
public void requite(final T bagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || **handoffQueue.offer(bagEntry)**) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
Thread.yield();
}
}
final List<Object> threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
**threadLocalList**.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
- 状态置为空闲 STATE_NOT_IN_USE
- 如果此时有线程在等待db连接,则会把这个连接 交给 handoffQueue 直接让阻塞在 borrow 的线程获取到
- 然后把这个连接存到 threadLocal里,这样当前线程如果再次获取db连接的话就会非常快
- 移除连接 remove(final T bagEntry)
public boolean remove(final T bagEntry)
{
// 1.
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
// 2.
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
// 3.
threadList.get().remove(bagEntry);
return removed;
}
- 只有空闲状态(NOT_IN_USE) 和 保留状态(RESERVED) 的连接才能被移除和关闭
- 从底层 sharedList 中移除
- 从 threadLocal 中移除
为什么 hikari 号称最快的连接池?
- 采用 threadLocal 缓存同一个线程使用过的db连接,并且是从最近一个开始遍历
int i = list.size() - 1**;**
, 这样同一个java线程多次获取db连接的时候就能快速获取到连接,而且获取到最鲜活的,还可避免 HikariPool#getConnection 的连接可用性的检查 - ConcurrentBag 底层 sharedList 用的 copyOnWriteArrayList,所以读和写之间就不需要加锁了
- 监控连接池状态 统计idle、active、total等个数的时候加锁的话, 就会阻塞住 add borrow 等操作
- ConcurrentBag 申请连接borrow 和归还连接 requite 只是进行 cas 状态变更的无锁操作,并不会从 copyOnWriteArrayList 移除元素
- 单线程的线程池:新建和关闭连接的线程池都是单线程的,也就避免了线程间的协调开销
- handoffQueue: SynchronousQueue 当申请连接 borrow() 的时候,如果空闲连接不够用,线程就会阻塞在 handoffQueue#poll ; 在连接新增 add() 或归还 requite() 时,如果发现有线程在等待中则会把这个连接交给 handoffQueue ,这样等待中的线程直接能直接获取到(hand off很形象啊),而不用再遍历 sharedList 同时也保障了公平性 先到先得
- minimumIdle 默认等于 maximumPoolSize, 尽量减少在申请连接的时候还需要创建连接的开销
网友评论