数据库连接池的产生背景
传统的单一数据库连接池存在弊端
传统的连接池函数可以表示为f(y)=kx,y表示资源消耗率,k表示连接次数,x表示连接动作(长连接),每次链接都会消耗一次资源,尤其是在高并发场景下,很快就会出现连接溢出(LIM(connectionTimes)=MAX(max_of_connections))。
所以链接池合理设计应该是具备动态规划性的
其转移方程应该是:
size=maxPoolSize
idelSize=idel of poolSize
ConcurrentBag=C
When idelSize>0 then do dp[i]=C.borrow(state_in_closed_connection) from poolEntry;
When idelSize=0 and poolSize<maxPoolSize then do dp[i]=C.borrow(state_in_closed_connection) from poolEntry;
else waitting do waiters.increAndGet()
链接池可以有效利用资源,并可以科学地管理链接句柄。
Mysql数据库场景下的链接池模拟图
![](https://img.haomeiwen.com/i18121073/4eb6bff2fe363e3d.png)
链接池的面向对象设计模版大概是
1.代理Connection对象
2.代理Statement对象
3.PooledConection池华对象
4.metrics性能监控模块
- 并发锁机制
6.SqlParser部分
7.更高级的机器学习自动优化链接池的最大链接数,最大等待激活connection时间,最小闲置链接数等。
HikariCP数据库是嵌入在Spring-framework里的最优数据库连接池,麻雀虽小,五脏俱全。并发能力较其他连接池优秀,还有比较完善的性能监控以日志的形式输出。
config参数有如下
参数 | 描述 | 默认值 | 其他 |
---|---|---|---|
autoCommit | 自动提交从池中返回的连接 。 | true | - |
connectionTimeout | 等待来自池的连接的最大毫秒数 | 30000 | 如果小于250毫秒,则被重置回30秒 |
idleTimeout | 连接允许在池中闲置的最长时间 | 600000 | 如果idleTimeout+1秒>maxLifetime 且 maxLifetime>0,则会被重置为0(代表永远不会退出);如果idleTimeout!=0且小于10秒,则会被重置为10秒 |
maxLifetime | 池中连接最长生命周期 | 1800000 | 池中连接最长生命周期 |
connectionTestQuery | 如果您的驱动程序支持JDBC4,我们强烈建议您不要设置此属性 | null | - |
minimumIdle | 池中维护的最小空闲连接数 | 10 | minIdle<0或者minIdle>maxPoolSize,则被重置为maxPoolSize |
maximumPoolSize | 池中最大连接数,包括闲置和使用中的连接 | 10 | 如果maxPoolSize小于1,则会被重置。当minIdle<=0被重置为DEFAULT_POOL_SIZE则为10;如果minIdle>0则重置为minIdle的值 |
metricRegistry | 该属性允许您指定一个 Codahale / Dropwizard MetricRegistry 的实例,供池使用以记录各种指标 | null | - |
healthCheckRegistry | 该属性允许您指定池使用的Codahale / Dropwizard HealthCheckRegistry的实例来报告当前健康信息 | null | - |
poolName | 连接池的用户定义名称,主要出现在日志记录和JMX管理控制台中以识别池和池配置 | HikariPool-1 | - |
initializationFailTimeout | 如果池无法成功初始化连接,则此属性控制池是否将 fail fast | 1 | |
isolateInternalQueries | 是否在其自己的事务中隔离内部池查询,例如连接活动测试 | false | - |
allowPoolSuspension | 控制池是否可以通过JMX暂停和恢复 | false | - |
readOnly | 从池中获取的连接是否默认处于只读模式 | false | - |
registerMbeans | 是否注册JMX管理Bean(MBeans) | false | - |
catalog | 为支持 catalog 概念的数据库设置默认 catalog | false | - |
connectionInitSql | 该属性设置一个SQL语句,在将每个新连接创建后,将其添加到池中之前执行该语句。 | null | - |
driverClassName | HikariCP将尝试通过仅基于jdbcUrl的DriverManager解析驱动程序,但对于一些较旧的驱动程序,还必须指定driverClassName。 | null | - |
transactionIsolation | 控制从池返回的连接的默认事务隔离级别 | null | - |
validationTimeout | 连接将被测试活动的最大时间量 | 5000 | - |
leakDetectionThreshold | 记录消息之前连接可能离开池的时间量,表示可能的连接泄漏 | 0 | 如果大于0且不是单元测试,则进一步判断:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),会被重置为0 . 即如果要生效则必须>0,而且不能小于2秒,而且当maxLifetime > 0时不能大于maxLifetime |
dataSource | 这个属性允许你直接设置数据源的实例被池包装,而不是让HikariCP通过反射来构造它 | null | - |
schema | 该属性为支持模式概念的数据库设置默认模式,支持SQL Script初始化执行 | null | - |
threadFactory | 此属性允许您设置将用于创建池使用的所有线程的java.util.concurrent.ThreadFactory的实例。 | null | - |
scheduledExecutor | 此属性允许您设置将用于各种内部计划任务的java.util.concurrent.ScheduledExecutorService实例 。 | null | - |
HikariConfig Implements HikariConfigMXBean
设计上是要支持JMX
JMX(Java Management Extensions,即Java管理扩展)是一个为应用程序、设备、系统等植入管理功能的框架。JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活的开发无缝集成的系统、网络和服务管理应用。可以支持远程链接配置信息,比如JNI-DataSource
private volatile String catalog;
private volatile long connectionTimeout;
private volatile long validationTimeout;
private volatile long idleTimeout;
private volatile long leakDetectionThreshold;
private volatile long maxLifetime;
private volatile int maxPoolSize;
private volatile int minIdle;
private volatile String username;
private volatile String password;
// Properties NOT changeable at runtime
//
private long initializationFailTimeout;
private String connectionInitSql;
private String connectionTestQuery;
private String dataSourceClassName;
private String dataSourceJndiName;
private String driverClassName;
private String exceptionOverrideClassName;
private String jdbcUrl;
private String poolName;
private String schema;
private String transactionIsolationName;
private boolean isAutoCommit;
private boolean isReadOnly;
private boolean isIsolateInternalQueries;
..............................
HikariCP的核心类是:
ConcurrentBag:并发包,结合AQS,ThreadLocal机制,用CopyOnWriteArrayList,SynchronousQueue(同步队列)实现跨线程的BagEntry共享机制
HikariPool :负责管理Connection,evicitConnection,openConnection等动作。
ProxyConnection :代理链接类
--public abstract class ProxyConnection implements Connection
{
//只读模式
static final int DIRTY_BIT_READONLY = 0b000001;
//自动提交模式
static final int DIRTY_BIT_AUTOCOMMIT = 0b000010;
//正在隔离
static final int DIRTY_BIT_ISOLATION = 0b000100;
static final int DIRTY_BIT_CATALOG = 0b001000;
//Connection timeOut
static final int DIRTY_BIT_NETTIMEOUT = 0b010000;
//执行Schema动作
static final int DIRTY_BIT_SCHEMA = 0b100000;
}
ProxyLeakTask:逃逸分析内,schedule多线程机制,实现了Runnable。
FastList:快速 添加Statement,IConcurrentBagEntry对象的集合类,no-fail-fast机制,无需扩
PoolEntry:池对象实体类,ConCurrentBag存放的对象
SuspendResumeLock:中断恢复锁机制,内部是Semaphore 信号量机制,该类设计上是期盼JIT机制本身应该做了优化的。
PoolEntry(final Connection connection, final PoolBase pool, final boolean isReadOnly, final boolean isAutoCommit)
{
this.connection = connection;
this.hikariPool = (HikariPool) pool;
this.isReadOnly = isReadOnly;
this.isAutoCommit = isAutoCommit;
this.lastAccessed = currentTime();
//默认大小是16
this.openStatements = new FastList<>(Statement.class, 16);
}
ConcurrentBags实现原理分析
PoolEntry(final Connection connection, final PoolBase pool, final boolean isReadOnly, final boolean isAutoCommit)
{
this.connection = connection;
this.hikariPool = (HikariPool) pool;
this.isReadOnly = isReadOnly;
this.isAutoCommit = isAutoCommit;
this.lastAccessed = currentTime();
//开放的openStatements始终存放在FastList里面
this.openStatements = new FastList<>(Statement.class, 16);
}
public ConcurrentBag(final IBagStateListener listener)
{
this.listener = listener;
this.weakThreadLocals = useWeakThreadLocals();
this.handoffQueue = new SynchronousQueue<>(true);
this.waiters = new AtomicInteger();
this.sharedList = new CopyOnWriteArrayList<>();
//这里是设计技巧
//如果是JIT实现的使用了托管给JVM弱引用的ThreadLocal存储对象时,则使用ArrayList
//有个疑问点是作者为什么不字节实现一个ThreadLocal,实现的时候判断下类加载机制的模式就好了
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
}
else {
this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
}
}
//并发包实体
public interface IConcurrentBagEntry
{
//不可用状态
int STATE_NOT_IN_USE = 0;
//可用状态
int STATE_IN_USE = 1;
//被移除状态
int STATE_REMOVED = -1;
//并发包集合里的所有元素是否被翻转,翻转动作不会优先占据有锁资源的对象
int STATE_RESERVED = -2;
//CAS机制
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
//获取状态
int getState();
}
//指定超时时间借出bagEntry对象,这里的entry是链接池的proxyConnection对象
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
//threadList ThreadLocal对象
final var list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
//获取待移除的对象
final var entry = list.remove(i);
@SuppressWarnings("unchecked")
//com.zaxxer.hikari.useWeakReferences 是否是true,true代表弱引用,弱引用就是来自于threadLocal里的List的reverse列表的最后一个元素
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
//判断状态是否是not_in_use,可用状态哦
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// Otherwise, scan the shared list ... then poll the handoff queue
//如果对象已经逃逸了
//从copyOnWriteList里面拿,
//等待原子性乐观锁机制increment
final int waiting = waiters.incrementAndGet();
try {
// 遍历sharedList
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
//如果被另一个waiter偷了一个cas version,需要请求添加另一个bag entry
//这里非常巧妙
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
//IBagStateListener 往bag状态监视器里面添加等待cas 原始值
listener.addBagItem(waiting);
//超时值转换为纳秒
timeout = timeUnit.toNanos(timeout);
do {
final var start = currentTime();
//从SynchronousQueue异步队列里获取一个bagEntry,并判断他的状态是否是STATE_NOT_IN_USE可用
//状态
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
//超时计数器递减
timeout -= elapsedNanos(start);
} while (timeout > 10_000); //单位时间是 1毫秒10*1纳秒
return null;
}
//最终等待器要-1
finally {
waiters.decrementAndGet();
}
}
//回收一个bagEntry对象
public void requite(final T bagEntry)
{
//设置回收对象
bagEntry.setState(STATE_NOT_IN_USE);
for (var i = 0; waiters.get() > 0; i++) {
//判断是否
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
//0xff 255=11111111,只有255与运算255时才等于本身,8个字节
//JVM对象头默认对齐填充方式是8个字节
else if ((i & 0xff) == 0xff) {
//为了[线程]调度,在许可可用[前]禁用当前线程,并最多等待指定的等待时间。
//如果许可可用,则使用该许可,并且该调用立即返回;否则,为线程调度禁用当前线程,并在发生以下四种情况之一前,使其处于休眠状态:
* [其他]某个线程将当前线程作为目标调用 `unpark`;或者
* 其他某个线程[中断])=当前线程;或者
* 已超过指定的等待时间;或者
* 该调用不合逻辑地(即毫无理由地)返回。
parkNanos(MICROSECONDS.toNanos(10));
}
else {
//当前线程的运行态变为就绪态
Thread.yield();
}
}
final var threadLocalList = threadList.get();
//判断是否是50,maxSize最大长度是50
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
//移除bag
public boolean remove(final T bagEntry)
{
//判断状态是否是可以从removed变更为in_use状态
//判断状态是否是可以从removed状态变更为翻转状态
//判断Connection状态是否是关闭状态volatile
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
//从copyOnWriteList里面移除
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
//从ThreadLocal<List<BagEntry>>里面移除
threadList.get().remove(bagEntry);
return removed;
}
//此方法用于通过<code>reserve(T)</code>保留项目
*可再次用于借用。
@SuppressWarnings("SpellCheckingInspection")
public void unreserve(final T bagEntry)
{
if (bagEntry.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
// spin until a thread takes it or none are waiting
//如果同步队列不能拿出一个资源,不能拿出一个资源代表锁资源没有被释放
//那么将线程的运行态循环变更为就绪态
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
else {
//没有成功打印日志提示
LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
}
}
//获取状态的6个长度的数组
public int[] getStateCounts()
{
final var states = new int[6];
for (var e : sharedList) {
++states[e.getState()];
}
//第四个存放共享资源的size
states[4] = sharedList.size();
//第五个获取等待资源的size
states[5] = waiters.get();
return states;
}
/**
* Close the bag to further adds.
*/
@Override
public void close()
{
closed = true;
}
ProxyConnection 关闭所有的Statements
@SuppressWarnings("EmptyTryBlock")
private synchronized void closeStatements()
{
final var size = openStatements.size();
if (size > 0) {
for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
try (Statement ignored = openStatements.get(i)) {
// automatic resource cleanup
}
catch (SQLException e) {
LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
poolEntry.getPoolName(), delegate);
//逃逸分析任务取消
leakTask.cancel();
poolEntry.evict("(exception closing Statements during Connection.close())");
delegate = ClosedConnection.CLOSED_CONNECTION;
}
}
//从statements里面移除
openStatements.clear();
}
}
@Override
public final void close() throws SQLException
{
// Closing statements can cause connection eviction, so this must run before the conditional below
closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) {
leakTask.cancel();
try {
//是赃读和不是自动提交模式,委派给回滚动作处理
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
}
//设置赃读状态
if (dirtyBits != 0) {
poolEntry.resetConnectionState(this, dirtyBits);
}
delegate.clearWarnings();
}
catch (SQLException e) {
// when connections are aborted, exceptions are often thrown that should not reach the application
if (!poolEntry.isMarkedEvicted()) {
throw checkException(e);
}
}
finally {
//最后要委派给回收处理并回收当前链接
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle();
}
}
}
FastList实现原理分析
移除,将待移除的元素放置最后一个位置
![](https://img.haomeiwen.com/i18121073/3e8ff26b523e7222.png)
@Override
//添加元素
public boolean add(T element)
{
//如果长度足够直接添加
if (size < elementData.length) {
elementData[size++] = element;
}
else {
// overflow-conscious code
final var oldCapacity = elementData.length;
//2倍扩容
final var newCapacity = oldCapacity << 1;
@SuppressWarnings("unchecked")
final var newElementData = (T[]) Array.newInstance(clazz, newCapacity);
System.arraycopy(elementData, 0, newElementData, 0, oldCapacity);
、、赋值
newElementData[size++] = element;
elementData = newElementData;
}
return true;
}
@Override
//移除元素
public boolean remove(Object element)
{
//找到元素并合并移除
for (var index = size - 1; index >= 0; index--) {
if (element == elementData[index]) {
final var numMoved = size - index - 1;
if (numMoved > 0) {
System.arraycopy(elementData, index + 1, elementData, index, numMoved);
}
elementData[--size] = null;
return true;
}
}
return false;
}
思考,如何设计一个支持高并发的数据库链接池
总结:决定了HikariCP链接池高性能的几个关键因素是
1.作者充分理解了并发的核心原理,高度设计了ConcurrentBag这个核心类,比如:ThreadLocal和CopyOnWriteArrayList在ConcurrentBag中都是成员变量,线程间不共享,避免了伪共享(false sharing)的发生.
2.作者对数据结构的合理设计,用FastList替换了ArrayList,适用于容器不需要频繁扩容的机制,也贴切了并发池这个设计场景
3.作者巧用了状态机设计模式,statement和connection还有BagEntry的状态分割合理设置了它们的状态
4.作者充分理解了多线程scheduler执行任务定时器的多线程包
1.下面画一下这个链接池的实现的核心逻辑
![](https://img.haomeiwen.com/i18121073/294a28d4227543f4.png)
网友评论