1. 简介
Hikari连接池目前公认是性能最高的数据库连接池,同时也是SpringBoot2.0以后默认使用的数据库连接池。
2. 关键配置
image.png这些参数在不指定时会有默认值,默认值经过validate方法,会赋不同的值,下面看下具体的源码实现。
dataSource的初始化
@Bean(name = "dataSource")
@Primary
public HikariDataSource dataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName("className");
hikariConfig.setUsername("user");
hikariConfig.setPassword("password");
hikariConfig.setJdbcUrl("jdbcurl");
hikariConfig.setMaximumPoolSize(100);
hikariConfig.setConnectionTimeout(500);
return new HikariDataSource(hikariConfig);
}
HikariConfig无参构造方法
private static final long CONNECTION_TIMEOUT = SECONDS.toMillis(30);
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final int DEFAULT_POOL_SIZE = 10;
/**
* Default constructor
*/
public HikariConfig()
{
dataSourceProperties = new Properties();
healthCheckProperties = new Properties();
minIdle = -1;
maxPoolSize = -1;
maxLifetime = MAX_LIFETIME;
connectionTimeout = CONNECTION_TIMEOUT;
validationTimeout = VALIDATION_TIMEOUT;
idleTimeout = IDLE_TIMEOUT;
initializationFailTimeout = 1;
isAutoCommit = true;
String systemProp = System.getProperty("hikaricp.configurationFile");
if (systemProp != null) {
loadProperties(systemProp);
}
}
HikariDataSource初始化方法
/**
* Construct a HikariDataSource with the specified configuration. The
* {@link HikariConfig} is copied and the pool is started by invoking this
* constructor.
*
* The {@link HikariConfig} can be modified without affecting the HikariDataSource
* and used to initialize another HikariDataSource instance.
*
* @param configuration a HikariConfig instance
*/
public HikariDataSource(HikariConfig configuration)
{
// 对参数中不合理的进行修改
configuration.validate();
configuration.copyStateTo(this);
LOGGER.info("{} - Starting...", configuration.getPoolName());
// 构造时就生产pool,不用等到使用才生成
pool = fastPathPool = new HikariPool(this);
LOGGER.info("{} - Start completed.", configuration.getPoolName());
this.seal();
}
validate方法中的validateNumerics
private static final long CONNECTION_TIMEOUT = SECONDS.toMillis(30);
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final int DEFAULT_POOL_SIZE = 10;
private void validateNumerics()
{
if (maxLifetime != 0 && maxLifetime < SECONDS.toMillis(30)) {
LOGGER.warn("{} - maxLifetime is less than 30000ms, setting to default {}ms.", poolName, MAX_LIFETIME);
maxLifetime = MAX_LIFETIME;
}
if (idleTimeout + SECONDS.toMillis(1) > maxLifetime && maxLifetime > 0) {
LOGGER.warn("{} - idleTimeout is close to or more than maxLifetime, disabling it.", poolName);
idleTimeout = 0;
}
if (idleTimeout != 0 && idleTimeout < SECONDS.toMillis(10)) {
LOGGER.warn("{} - idleTimeout is less than 10000ms, setting to default {}ms.", poolName, IDLE_TIMEOUT);
idleTimeout = IDLE_TIMEOUT;
}
if (leakDetectionThreshold > 0 && !unitTest) {
if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {
LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);
leakDetectionThreshold = 0;
}
}
if (connectionTimeout < 250) {
LOGGER.warn("{} - connectionTimeout is less than 250ms, setting to {}ms.", poolName, CONNECTION_TIMEOUT);
connectionTimeout = CONNECTION_TIMEOUT;
}
if (validationTimeout < 250) {
LOGGER.warn("{} - validationTimeout is less than 250ms, setting to {}ms.", poolName, VALIDATION_TIMEOUT);
validationTimeout = VALIDATION_TIMEOUT;
}
if (maxPoolSize < 1) {
maxPoolSize = (minIdle <= 0) ? DEFAULT_POOL_SIZE : minIdle;
}
if (minIdle < 0 || minIdle > maxPoolSize) {
minIdle = maxPoolSize;
}
}
3 Hikari源码解析
3.1 获取连接
1、Hikari中的核心类为HikariDataSource,表示Hikari连接池中的数据源,实现了DataSource接口的getConnection方法,getConnection方法源码如下:
/** 连接池对象
* fastPathPool 会在初始化时创建
* pool 是在获取连接数创建
* volatile修饰pool导致每次读pool都要从主存加载,每次写也要写回主存,性能不如没volatile修饰的fastPathPool
* */
private final HikariPool fastPathPool;
private volatile HikariPool pool;
/** 获取连接*/
public Connection getConnection() throws SQLException
{
if (isClosed()) {
throw new SQLException("HikariDataSource " + this + " has been closed.");
}
/** 如果fastPathPool存在则直接获取连接,有参构造方法都有*/
if (fastPathPool != null) {
return fastPathPool.getConnection();
}
/** 如果没有fastPathPool 则创建HikariPool对象 */
// 双检实现单例,pool用volatile修饰,防止指令重排序
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
validate();
LOGGER.info("{} - Starting...", getPoolName());
try {
/** 初始化创建HikariPool对象*/
pool = result = new HikariPool(this);
this.seal();
}
catch (PoolInitializationException pie) {
//
}
}
}
}
/** 调用pool的getConnection()方法获取连接*/
return result.getConnection();
}
getConnection方法逻辑不多,主要是调用了HikariPool的getConnection()方法,而HikariDataSource中有两个HikariPool对象,一个是fastPathPool是在HikariPool有参构造函数中创建, 如果没有创建fastPathPool,那么就会在getConnection方法时创建pool对象。
很显然pool对象是由volatile关键字修饰的,而fastPathPool是final类型的,所以fastPathPool的效率会比pool要高,所以推荐使用HikariDataSource有参构造函数进行初始化。
2、由上可知获取连接的逻辑是在HikariPool的getConnection方法中,继续分析HikariPool的getConnection方法,源码如下:
/** 获取连接*/
public Connection getConnection(final long hardTimeout) throws SQLException
{
/** 获取锁*/
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
/** 从ConcurrentBag中借出一个PoolEntry对象 */
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}
final long now = currentTime();
/** 判断连接是否被标记为抛弃 或者 空闲时间过长, 是的话就关闭连接*/
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
metricsTracker.recordBorrowStats(poolEntry, startTime);
/** 通过Javassist创建代理连接*/
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
/** 释放锁*/
suspendResumeLock.release();
}
}
核心步骤只有两步,一个是调用ConcurrentBag的borrow方法借用一个PoolEntry对象,第二步调用调用PoolEntry的createProxyConnection方法动态生成代理connection对象。
这里涉及到了两个核心的类,分别是ConcurrentBag和PoolEntry。
PoolEntry
PoolEntry顾名思义是连接池的节点,实际也可以看作是一个Connection对象的封装,连接池中存储的连接就是以PoolEntry的方式进行存储。
PoolEntry内部属性如下:
属性 | 类型 | 描述 |
---|---|---|
connection | Connection | 数据库连接 |
lastAccessed | long | 上一次访问时间 |
lastBorrowed | long | 上一次借出时间 |
state | volatile int | 当前状态 |
evict | volatile boolean | 是否该丢弃 |
openStatements | FastList | 打开的statement集合 |
hikariPool | HikariPool | 关联的HikariPool对象 |
isReadOnly | boolean | 是否只读 |
isAutoCommit | boolean | 是否自动提交 |
ConcurrentBag
ConcurrentBag直意就是并发包,本质就是连接池的主体,存储连接的封装对象PoolEntry,另外做了并发控制来解决连接池的并发问题。
ConcurrentBag的内部属性如下:
ConcurrentBag借出一个元素
/** 借出一个对象 */
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
/** 1.从ThreadLocal中获取当前线程绑定的对象集合 */
final List<Object> list = threadList.get();
/** 1.1.如果当前线程变量中存在就直接从list中返回一个*/
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
/** 2.当前等待对象数量自增1 */
final int waiting = waiters.incrementAndGet();
try {
/** 3.遍历当前缓存的sharedList, 如果当前状态为未使用,则通过CAS修改为已使用*/
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
/** 4.如果当前等待线程不止1个,则给监听中添加一个任务 */
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
/** 4.如果当前缓存的sharedList为空或者都在使用中,那么给listener添加一个任务*/
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
/** 5.从阻塞队列中等待超时获取元素 */
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
/** 6.如果获取元素失败或者获取元素且使用成功则均返回 */
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
/** 6.等待线程数自减1 */
waiters.decrementAndGet();
}
}
sharedList
从ThreadLocal中获取连接失败之后,会再次尝试从sharedList中获取,sharedList集合存在初始化的PoolEntry。在ConcurrentBag初始化的,会初始化指定数量的PoolEntry对象存入sharedList,源码如下:
/** ConcurrentHand
* IBagStateListener bag状态监听器,HikariPool实现了IBagStateListener接口
* 所以构造器传入的listener实际就是HikariPool对象
* */
public ConcurrentBag(final IBagStateListener listener)
{
this.listener = listener;
//是否使用弱引用
this.weakThreadLocals = useWeakThreadLocals();
//初始化阻塞队列
this.handoffQueue = new SynchronousQueue<>(true);
//初始化等待连接数
this.waiters = new AtomicInteger();
//初始化sharedList
this.sharedList = new CopyOnWriteArrayList<>();
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
}
else {
this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
}
}
HikariPool内部属性包含了ConcurrentBag对象,在HikariPool初始化时会创建ConcurrentBag对象,所以ConcurrentBag的构造函数是在HikariPool初始化时调用,HikariPool构造函数如下:
public HikariPool(final HikariConfig config)
{
super(config);
//初始化ConcurrentBag对象
this.connectionBag = new ConcurrentBag<>(this);
//创建SuspendResumeLock对象
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
/** 初始化线程池,houseKeeping可以理解为保持空间充足的意思,空间也就是连接池,该线程池的作用就是保持连接池中合适的连接数的作用 */
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
/** 设置属性*/
checkFailFast();
if (config.getMetricsTrackerFactory() != null) {
setMetricsTrackerFactory(config.getMetricsTrackerFactory());
}
else {
setMetricRegistry(config.getMetricRegistry());
}
setHealthCheckRegistry(config.getHealthCheckRegistry());
handleMBeans(this, true);
ThreadFactory threadFactory = config.getThreadFactory();
/** 根据配置的最大连接数,创建链表类型阻塞队列 */
LinkedBlockingQueue<Runnable> addQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize());
this.addConnectionQueue = unmodifiableCollection(addQueue);
/** 初始化创建连接线程池*/
this.addConnectionExecutor = createThreadPoolExecutor(addQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
/** 初始化关闭连接线程池*/
this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
/** 创建保持连接池连接数量的任务*/
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {
addConnectionExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
addConnectionExecutor.setMaximumPoolSize(Runtime.getRuntime().availableProcessors());
final long startTime = currentTime();
while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {
quietlySleep(MILLISECONDS.toMillis(100));
}
addConnectionExecutor.setCorePoolSize(1);
addConnectionExecutor.setMaximumPoolSize(1);
}
}
这里有一个定时任务houseKeeperTask,该定时任务的作用是定时检测连接池中连接的数量,执行的内容就是HouseKeep的run方法,逻辑如下:
private final class HouseKeeper implements Runnable
{
private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);
@Override
public void run()
{
try {
/** 读取连接池配置 */
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog;
final long idleTimeout = config.getIdleTimeout();
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
poolName, elapsedDisplayString(previous, now));
previous = now;
/** 关闭连接池中需要被丢弃的连接 */
softEvictConnections();
return;
}
else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) {
// No point evicting for forward clock motion, this merely accelerates connection retirement anyway
logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
}
previous = now;
String afterPrefix = "Pool ";
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";
/** 获取当前连接池中已经不是使用中的连接集合 */
final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
int toRemove = notInUse.size() - config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
/** 当前空闲的连接如果超过最大空闲时间idleTimeout则关闭空闲连接 */
if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
}
logPoolState(afterPrefix);
/** 填充连接池,保持连接池数量至少保持minimum个连接数量 */
fillPool(); // Try to maintain minimum connections
}
catch (Exception e) {
logger.error("Unexpected exception in housekeeping task", e);
}
}
}
该定时任务主要是为了维护连接池中连接的数量,首先需要将被标记为需要丢弃的连接进行关闭,然后将空闲超时的连接进行关闭,最后当连接池中的连接少于最小值时就需要对连接池进行补充连接的操作。所以在初始化连接池时,初始化连接的操作就是在fillPool方法中实现的。fillPool方法源码如下:
/** 填充连接池 */
private synchronized void fillPool()
{
/**
* 计算需要添加的连接数量
* config.getMaximumPoolSize - getTotalConnections() 表示连接池最大值-当前连接的数量=最多还可以创建的连接数
* config.getMinimumIdle() - getIdleConnections() 表示连接池最小值 - 当前空闲的连接数= 当前可以连接数
* Math.min计算得到最少需要的连接数 - addConnectionQueue.size() = 还需要创建连接的任务数量
* */
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueue.size();
for (int i = 0; i < connectionsToAdd; i++) {
/** 向创建连接线程池中提交创建连接的任务 */
addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
}
}
先计算需要创建的连接数量,向创建连接的线程池中提交任务 poolEntryCreator,创建最后一个任务时创建的是postFillPoolEntryCreator, 两者没有本质的区别,只是打印的日志不一样而已.
PoolEntryCreator创建PoolEntry对象的逻辑如下:
/** 创建PoolEntry对象线程 */
private final class PoolEntryCreator implements Callable<Boolean> {
/**
* 日志前缀
*/
private final String loggingPrefix;
PoolEntryCreator(String loggingPrefix) {
this.loggingPrefix = loggingPrefix;
}
@Override
public Boolean call() {
long sleepBackoff = 250L;
/** 1.当前连接池状态正常并且需求创建连接时 */
while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
/** 2.创建PoolEntry对象 */
final PoolEntry poolEntry = createPoolEntry();
if (poolEntry != null) {
/** 3.将PoolEntry对象添加到ConcurrentBag对象中的sharedList中 */
connectionBag.add(poolEntry);
logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
if (loggingPrefix != null) {
logPoolState(loggingPrefix);
}
return Boolean.TRUE;
}
/** 睡眠指定时间*/
quietlySleep(sleepBackoff);
sleepBackoff = Math.min(SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5)));
}
// Pool is suspended or shutdown or at max size
return Boolean.FALSE;
}
}
createPoolEntry方法逻辑如下:
/** 创建PoolEntry对象 */
private PoolEntry createPoolEntry()
{
try {
/** 1.初始化PoolEntry对象,会先创建Connection对象传入PoolEntry的构造函数中 */
final PoolEntry poolEntry = newPoolEntry();
/** 2.获取连接最大生命周期时长 */
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
/** 3.获取一个随机值,防止PoolEntry同时创建同时被销毁,添加随机值错开时间差 */
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
/** 4.给PoolEntry添加定时任务,当PoolEntry对象达到最大生命周期时间后触发定时任务将连接标记为被抛弃 */
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
() -> {
/** 5.达到最大生命周期,抛弃连接 */
if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
/** 6.丢弃一个连接之后,调用addBagItem补充新的PoolEntry对象 */
addBagItem(connectionBag.getWaitingThreadCount());
}
},
lifetime, MILLISECONDS));
}
return poolEntry;
}
/** 异常捕获*/
catch (ConnectionSetupException e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause());
lastConnectionFailure.set(e);
}
return null;
}
catch (SQLException e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
logger.debug("{} - Cannot acquire connection from data source", poolName, e);
lastConnectionFailure.set(new ConnectionSetupException(e));
}
return null;
}
catch (Exception e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
logger.error("{} - Error thrown while acquiring connection from data source", poolName, e);
lastConnectionFailure.set(new ConnectionSetupException(e));
}
return null;
}
}
创建数据库连接
// DriverDataSource
@Override
public Connection getConnection() throws SQLException
{
return driver.connect(jdbcUrl, driverProperties);
}
// NonRegisteringDriver
public Connection connect(String url, Properties info) throws SQLException {
try {
try {
if (!ConnectionUrl.acceptsUrl(url)) {
return null;
} else {
ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
switch(conStr.getType()) {
case SINGLE_CONNECTION:
return ConnectionImpl.getInstance(conStr.getMainHost());
case FAILOVER_CONNECTION:
case FAILOVER_DNS_SRV_CONNECTION:
return FailoverConnectionProxy.createProxyInstance(conStr);
case LOADBALANCE_CONNECTION:
case LOADBALANCE_DNS_SRV_CONNECTION:
return LoadBalancedConnectionProxy.createProxyInstance(conStr);
case REPLICATION_CONNECTION:
case REPLICATION_DNS_SRV_CONNECTION:
return ReplicationConnectionProxy.createProxyInstance(conStr);
default:
return null;
}
}
} catch (UnsupportedConnectionStringException var5) {
return null;
} catch (CJException var6) {
throw (UnableToConnectException)ExceptionFactory.createException(UnableToConnectException.class, Messages.getString("NonRegisteringDriver.17", new Object[]{var6.toString()}), var6);
}
} catch (CJException var7) {
throw SQLExceptionsMapping.translateException(var7);
}
}
首先创建一个新的PoolEntry对象,PoolEntry构造时会创建Connection对象,另外如果连接设置了最大生命周期时长,那么需要给每个PoolEntry添加定时任务,为了防止多个PoolEntry同时创建同时被关闭,所以每个PoolEntry的最大生命周期时间都不一样。当PoolEntry达到最大生命周期后会触发softEvictConnection方法,将PoolEntry标记为需要被丢弃,另外由于抛弃了PoolEntry对象,所以需要重新调用addBagItem方法对PoolEntry对象进行补充。
通过IBagStateListener创建新的元素
由于第二步可知,IBagStateListener主要有一个addBagItem方法,HikariPool实现了addBagItem方法,方法源码如下:
public void addBagItem(final int waiting)
{
/** 判断是否需要创建连接 */
final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
if (shouldAdd) {
/** 向创建连接线程池中提交创建连接的任务 */
addConnectionExecutor.submit(poolEntryCreator);
}
}
从ConcurrentBag中获取连接一共分成三步,首先从当前线程的ThreadLocal中获取,如果有直接返回一个连接,如果ThreadLocal中没有则从sharedList中获取,sharedList可以理解为ConcurrentBag缓存的连接池,每当创建了一个PoolEntry对象之后都会添加到sharedList中去,如果sharedList中的连接状态都不是可用状态,此时就需要通过IBagStateListener提交一个创建连接的任务,交给创建连接的线程池去执行,创建新的连接。
新的连接创建成功之后会将PoolEntry对象添加到无容量的阻塞队列handoffQueue中,等待连接的线程不断尝试从handoffQueue队列中获取连接直到成功获取或者超时返回。
3.2 释放连接
当客户端释放连接时会调用collection的close方法,Hikari中的Connection使用的是代理连接ProxyConnection对象,调用close方法时会调用关联的PoolEntry对象的回收方法recycle方法,PoolEntry的recycle方法源码如下:
/**
* Release this entry back to the pool.
*
* @param lastAccessed last access time-stamp
*/
void recycle(final long lastAccessed)
{
if (connection != null) {
this.lastAccessed = lastAccessed;
hikariPool.recycle(this);
}
}
/**
* Recycle PoolEntry (add back to the pool)
*
* @param poolEntry the PoolEntry to recycle
*/
@Override
void recycle(final PoolEntry poolEntry)
{
metricsTracker.recordConnectionUsage(poolEntry);
connectionBag.requite(poolEntry);
}
/**
* This method will return a borrowed object to the bag. Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak.
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the bagEntry was not borrowed from the bag
*/
public void requite(final T bagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
yield();
}
}
final List<Object> threadLocalList = threadList.get();
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
回收连接最终会调用ConcurrentBag的requite方法,方法逻辑不复杂,首先将PoolEntry元素状态设置为未使用,然后判断当前是否存在等待连接的线程,如果存在则将连接加入到无界阻塞队列中去,由等待连接的线程从阻塞队列中去获取;
如果当前没有等待连接的线程,则将连接添加到本地线程变量ThreadLocal中,等待当前线程下次获取连接时直接从ThreadLocal中获取。
4. Hikari连接池高性能的原因
1、采用自定义的FastList替代了ArrayList,FastList的get方法去除了范围检查rangeCheck逻辑,并且remove方法是从尾部开始扫描的,而并不是从头部开始扫描的。因为Connection的打开和关闭顺序通常是相反的
2、初始化时创建了两个HikariPool对象,一个采用final类型定义,避免在获取连接时才初始化,因为获取连接时才初始化就需要做同步处理
3、Hikari创建连接是通过javassist动态字节码生成技术创建的,性能更好
4、从连接池中获取连接时对于同一个线程在threadLocal中添加了缓存,同一线程获取连接时没有并发操作
5、Hikari最大的特点是在高并发的情况下尽量的减少锁竞争
网友评论