美文网首页
高性能数据库链接池HikariCP原理分析

高性能数据库链接池HikariCP原理分析

作者: 头发掉了 | 来源:发表于2023-02-14 01:36 被阅读0次

数据库连接池的产生背景

传统的单一数据库连接池存在弊端

传统的连接池函数可以表示为f(y)=kx,y表示资源消耗率,k表示连接次数,x表示连接动作(长连接),每次链接都会消耗一次资源,尤其是在高并发场景下,很快就会出现连接溢出(LIM(connectionTimes)=MAX(max_of_connections))。

所以链接池合理设计应该是具备动态规划性的
其转移方程应该是:
size=maxPoolSize
idelSize=idel of poolSize
ConcurrentBag=C
When idelSize>0 then do dp[i]=C.borrow(state_in_closed_connection) from poolEntry;
When idelSize=0 and poolSize<maxPoolSize then do dp[i]=C.borrow(state_in_closed_connection) from poolEntry;
else waitting do waiters.increAndGet()

链接池可以有效利用资源,并可以科学地管理链接句柄。
Mysql数据库场景下的链接池模拟图

pooled_compare_nopooled.png

链接池的面向对象设计模版大概是
1.代理Connection对象
2.代理Statement对象
3.PooledConection池华对象
4.metrics性能监控模块

  1. 并发锁机制
    6.SqlParser部分
    7.更高级的机器学习自动优化链接池的最大链接数,最大等待激活connection时间,最小闲置链接数等。

HikariCP数据库是嵌入在Spring-framework里的最优数据库连接池,麻雀虽小,五脏俱全。并发能力较其他连接池优秀,还有比较完善的性能监控以日志的形式输出。

config参数有如下

参数 描述 默认值 其他
autoCommit 自动提交从池中返回的连接 。 true -
connectionTimeout 等待来自池的连接的最大毫秒数 30000 如果小于250毫秒,则被重置回30秒
idleTimeout 连接允许在池中闲置的最长时间 600000 如果idleTimeout+1秒>maxLifetime 且 maxLifetime>0,则会被重置为0(代表永远不会退出);如果idleTimeout!=0且小于10秒,则会被重置为10秒
maxLifetime 池中连接最长生命周期 1800000 池中连接最长生命周期
connectionTestQuery 如果您的驱动程序支持JDBC4,我们强烈建议您不要设置此属性 null -
minimumIdle 池中维护的最小空闲连接数 10 minIdle<0或者minIdle>maxPoolSize,则被重置为maxPoolSize
maximumPoolSize 池中最大连接数,包括闲置和使用中的连接 10 如果maxPoolSize小于1,则会被重置。当minIdle<=0被重置为DEFAULT_POOL_SIZE则为10;如果minIdle>0则重置为minIdle的值
metricRegistry 该属性允许您指定一个 Codahale / Dropwizard MetricRegistry 的实例,供池使用以记录各种指标 null -
healthCheckRegistry 该属性允许您指定池使用的Codahale / Dropwizard HealthCheckRegistry的实例来报告当前健康信息 null -
poolName 连接池的用户定义名称,主要出现在日志记录和JMX管理控制台中以识别池和池配置 HikariPool-1 -
initializationFailTimeout 如果池无法成功初始化连接,则此属性控制池是否将 fail fast 1
isolateInternalQueries 是否在其自己的事务中隔离内部池查询,例如连接活动测试 false -
allowPoolSuspension 控制池是否可以通过JMX暂停和恢复 false -
readOnly 从池中获取的连接是否默认处于只读模式 false -
registerMbeans 是否注册JMX管理Bean(MBeans) false -
catalog 为支持 catalog 概念的数据库设置默认 catalog false -
connectionInitSql 该属性设置一个SQL语句,在将每个新连接创建后,将其添加到池中之前执行该语句。 null -
driverClassName HikariCP将尝试通过仅基于jdbcUrl的DriverManager解析驱动程序,但对于一些较旧的驱动程序,还必须指定driverClassName。 null -
transactionIsolation 控制从池返回的连接的默认事务隔离级别 null -
validationTimeout 连接将被测试活动的最大时间量 5000 -
leakDetectionThreshold 记录消息之前连接可能离开池的时间量,表示可能的连接泄漏 0 如果大于0且不是单元测试,则进一步判断:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),会被重置为0 . 即如果要生效则必须>0,而且不能小于2秒,而且当maxLifetime > 0时不能大于maxLifetime
dataSource 这个属性允许你直接设置数据源的实例被池包装,而不是让HikariCP通过反射来构造它 null -
schema 该属性为支持模式概念的数据库设置默认模式,支持SQL Script初始化执行 null -
threadFactory 此属性允许您设置将用于创建池使用的所有线程的java.util.concurrent.ThreadFactory的实例。 null -
scheduledExecutor 此属性允许您设置将用于各种内部计划任务的java.util.concurrent.ScheduledExecutorService实例 。 null -

HikariConfig Implements HikariConfigMXBean

设计上是要支持JMX

JMX(Java Management Extensions,即Java管理扩展)是一个为应用程序、设备、系统等植入管理功能的框架。JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活的开发无缝集成的系统、网络和服务管理应用。可以支持远程链接配置信息,比如JNI-DataSource

   private volatile String catalog;
   private volatile long connectionTimeout;
   private volatile long validationTimeout;
   private volatile long idleTimeout;
   private volatile long leakDetectionThreshold;
   private volatile long maxLifetime;
   private volatile int maxPoolSize;
   private volatile int minIdle;
   private volatile String username;
   private volatile String password;

   // Properties NOT changeable at runtime
   //
   private long initializationFailTimeout;
   private String connectionInitSql;
   private String connectionTestQuery;
   private String dataSourceClassName;
   private String dataSourceJndiName;
   private String driverClassName;
   private String exceptionOverrideClassName;
   private String jdbcUrl;
   private String poolName;
   private String schema;
   private String transactionIsolationName;
   private boolean isAutoCommit;
   private boolean isReadOnly;
   private boolean isIsolateInternalQueries;

..............................

HikariCP的核心类是:

ConcurrentBag:并发包,结合AQS,ThreadLocal机制,用CopyOnWriteArrayList,SynchronousQueue(同步队列)实现跨线程的BagEntry共享机制

HikariPool :负责管理Connection,evicitConnection,openConnection等动作。

ProxyConnection :代理链接类
--public abstract class ProxyConnection implements Connection
{
//只读模式
static final int DIRTY_BIT_READONLY = 0b000001;
//自动提交模式
static final int DIRTY_BIT_AUTOCOMMIT = 0b000010;
//正在隔离
static final int DIRTY_BIT_ISOLATION = 0b000100;
static final int DIRTY_BIT_CATALOG = 0b001000;
//Connection timeOut
static final int DIRTY_BIT_NETTIMEOUT = 0b010000;
//执行Schema动作
static final int DIRTY_BIT_SCHEMA = 0b100000;

ProxyLeakTask:逃逸分析内,schedule多线程机制,实现了Runnable。

FastList:快速 添加Statement,IConcurrentBagEntry对象的集合类,no-fail-fast机制,无需扩

PoolEntry:池对象实体类,ConCurrentBag存放的对象

SuspendResumeLock:中断恢复锁机制,内部是Semaphore 信号量机制,该类设计上是期盼JIT机制本身应该做了优化的。

      PoolEntry(final Connection connection, final PoolBase pool, final boolean isReadOnly, final boolean isAutoCommit)
   {
      this.connection = connection;
      this.hikariPool = (HikariPool) pool;
      this.isReadOnly = isReadOnly;
      this.isAutoCommit = isAutoCommit;
      this.lastAccessed = currentTime();
     //默认大小是16
      this.openStatements = new FastList<>(Statement.class, 16);
   }

ConcurrentBags实现原理分析


  PoolEntry(final Connection connection, final PoolBase pool, final boolean isReadOnly, final boolean isAutoCommit)
   {
      this.connection = connection;
      this.hikariPool = (HikariPool) pool;
      this.isReadOnly = isReadOnly;
      this.isAutoCommit = isAutoCommit;
      this.lastAccessed = currentTime();
  //开放的openStatements始终存放在FastList里面
      this.openStatements = new FastList<>(Statement.class, 16);
   }
  public ConcurrentBag(final IBagStateListener listener)
   {
      this.listener = listener;
      this.weakThreadLocals = useWeakThreadLocals();

      this.handoffQueue = new SynchronousQueue<>(true);
      this.waiters = new AtomicInteger();
      this.sharedList = new CopyOnWriteArrayList<>();
      //这里是设计技巧
    //如果是JIT实现的使用了托管给JVM弱引用的ThreadLocal存储对象时,则使用ArrayList
   //有个疑问点是作者为什么不字节实现一个ThreadLocal,实现的时候判断下类加载机制的模式就好了
      if (weakThreadLocals) {
         this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
      }
      else {
         this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
      }
   }

  //并发包实体
   public interface IConcurrentBagEntry
   {
     //不可用状态
      int STATE_NOT_IN_USE = 0;
//可用状态
      int STATE_IN_USE = 1;
//被移除状态
      int STATE_REMOVED = -1;
//并发包集合里的所有元素是否被翻转,翻转动作不会优先占据有锁资源的对象
      int STATE_RESERVED = -2;
     //CAS机制
      boolean compareAndSet(int expectState, int newState);
      void setState(int newState);
    //获取状态
      int getState();
   }
//指定超时时间借出bagEntry对象,这里的entry是链接池的proxyConnection对象

 public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // Try the thread-local list first
   //threadList ThreadLocal对象
      final var list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
     //获取待移除的对象
         final var entry = list.remove(i);
         @SuppressWarnings("unchecked")
            //com.zaxxer.hikari.useWeakReferences 是否是true,true代表弱引用,弱引用就是来自于threadLocal里的List的reverse列表的最后一个元素
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
   //判断状态是否是not_in_use,可用状态哦
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }

      // Otherwise, scan the shared list ... then poll the handoff queue
   //如果对象已经逃逸了
 //从copyOnWriteList里面拿,
//等待原子性乐观锁机制increment
      final int waiting = waiters.incrementAndGet();
      try {
// 遍历sharedList
         for (T bagEntry : sharedList) {
 
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                //如果被另一个waiter偷了一个cas version,需要请求添加另一个bag entry
                 //这里非常巧妙
               // If we may have stolen another waiter's connection, request another bag add.
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }
         //IBagStateListener 往bag状态监视器里面添加等待cas 原始值
         listener.addBagItem(waiting);
       
        //超时值转换为纳秒
         timeout = timeUnit.toNanos(timeout);
         do {
            final var start = currentTime();
         //从SynchronousQueue异步队列里获取一个bagEntry,并判断他的状态是否是STATE_NOT_IN_USE可用
//状态
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }
             //超时计数器递减
            timeout -= elapsedNanos(start);
         } while (timeout > 10_000); //单位时间是 1毫秒10*1纳秒

         return null;
      }
    //最终等待器要-1
      finally {
         waiters.decrementAndGet();
      }
   }

   //回收一个bagEntry对象
   public void requite(final T bagEntry)
   {
 //设置回收对象
      bagEntry.setState(STATE_NOT_IN_USE);

      for (var i = 0; waiters.get() > 0; i++) {
         //判断是否
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
        //0xff 255=11111111,只有255与运算255时才等于本身,8个字节
      //JVM对象头默认对齐填充方式是8个字节
         else if ((i & 0xff) == 0xff) {
           //为了[线程]调度,在许可可用[前]禁用当前线程,并最多等待指定的等待时间。
//如果许可可用,则使用该许可,并且该调用立即返回;否则,为线程调度禁用当前线程,并在发生以下四种情况之一前,使其处于休眠状态:

*   [其他]某个线程将当前线程作为目标调用 `unpark`;或者
*   其他某个线程[中断])=当前线程;或者
*   已超过指定的等待时间;或者
*   该调用不合逻辑地(即毫无理由地)返回。


            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
         //当前线程的运行态变为就绪态
            Thread.yield();
         }
      }

      final var threadLocalList = threadList.get();
  //判断是否是50,maxSize最大长度是50
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
   }

  //移除bag
   public boolean remove(final T bagEntry)
   {
     //判断状态是否是可以从removed变更为in_use状态
      //判断状态是否是可以从removed状态变更为翻转状态
       //判断Connection状态是否是关闭状态volatile 
      if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
         return false;
      }
      //从copyOnWriteList里面移除
      final boolean removed = sharedList.remove(bagEntry);
      if (!removed && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
      }
     //从ThreadLocal<List<BagEntry>>里面移除
      threadList.get().remove(bagEntry);

      return removed;
   }


 //此方法用于通过<code>reserve(T)</code>保留项目

*可再次用于借用。
   @SuppressWarnings("SpellCheckingInspection")
   public void unreserve(final T bagEntry)
   {
      if (bagEntry.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
         // spin until a thread takes it or none are waiting
       //如果同步队列不能拿出一个资源,不能拿出一个资源代表锁资源没有被释放
      //那么将线程的运行态循环变更为就绪态
         while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
            Thread.yield();
         }
      }
      else {
        //没有成功打印日志提示
         LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
      }
   }

//获取状态的6个长度的数组
 public int[] getStateCounts()
   {
      final var states = new int[6];
      for (var e : sharedList) {
         ++states[e.getState()];
      }
//第四个存放共享资源的size
      states[4] = sharedList.size();
//第五个获取等待资源的size
      states[5] = waiters.get();

      return states;
   }



   /**
    * Close the bag to further adds.
    */
   @Override
   public void close()
   {
      closed = true;
   }

ProxyConnection 关闭所有的Statements
 @SuppressWarnings("EmptyTryBlock")
   private synchronized void closeStatements()
   {
      final var size = openStatements.size();
      if (size > 0) {
         for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
            try (Statement ignored = openStatements.get(i)) {
               // automatic resource cleanup
            }
            catch (SQLException e) {
               LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
                           poolEntry.getPoolName(), delegate);
              //逃逸分析任务取消
               leakTask.cancel();
               poolEntry.evict("(exception closing Statements during Connection.close())");
               delegate = ClosedConnection.CLOSED_CONNECTION;
            }
         }
        //从statements里面移除
         openStatements.clear();
      }
   }


   @Override
   public final void close() throws SQLException
   {
      // Closing statements can cause connection eviction, so this must run before the conditional below
      closeStatements();

      if (delegate != ClosedConnection.CLOSED_CONNECTION) {
         leakTask.cancel();

         try {
        //是赃读和不是自动提交模式,委派给回滚动作处理
            if (isCommitStateDirty && !isAutoCommit) {
               delegate.rollback();
               LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
            }
          //设置赃读状态
            if (dirtyBits != 0) {
               poolEntry.resetConnectionState(this, dirtyBits);
            }

            delegate.clearWarnings();
         }
         catch (SQLException e) {
            // when connections are aborted, exceptions are often thrown that should not reach the application
            if (!poolEntry.isMarkedEvicted()) {
               throw checkException(e);
            }
         }
         finally {

         //最后要委派给回收处理并回收当前链接
            delegate = ClosedConnection.CLOSED_CONNECTION;
            poolEntry.recycle();
         }
      }
   }

FastList实现原理分析

移除,将待移除的元素放置最后一个位置


移除.png
   @Override
 //添加元素
   public boolean add(T element)
   {
//如果长度足够直接添加

      if (size < elementData.length) {
         elementData[size++] = element;
      }
      else {
         // overflow-conscious code
         final var oldCapacity = elementData.length;
     //2倍扩容
         final var newCapacity = oldCapacity << 1;
         @SuppressWarnings("unchecked")
         final var newElementData = (T[]) Array.newInstance(clazz, newCapacity);
         System.arraycopy(elementData, 0, newElementData, 0, oldCapacity);
、、赋值
         newElementData[size++] = element;
         elementData = newElementData;
      }

      return true;
   }


 @Override
//移除元素
   public boolean remove(Object element)
   {
    //找到元素并合并移除
      for (var index = size - 1; index >= 0; index--) {
         if (element == elementData[index]) {
            final var numMoved = size - index - 1;
            if (numMoved > 0) {
               System.arraycopy(elementData, index + 1, elementData, index, numMoved);
            }
            elementData[--size] = null;
            return true;
         }
      }

      return false;
   }


思考,如何设计一个支持高并发的数据库链接池

总结:决定了HikariCP链接池高性能的几个关键因素是
1.作者充分理解了并发的核心原理,高度设计了ConcurrentBag这个核心类,比如:ThreadLocal和CopyOnWriteArrayList在ConcurrentBag中都是成员变量,线程间不共享,避免了伪共享(false sharing)的发生.
2.作者对数据结构的合理设计,用FastList替换了ArrayList,适用于容器不需要频繁扩容的机制,也贴切了并发池这个设计场景
3.作者巧用了状态机设计模式,statement和connection还有BagEntry的状态分割合理设置了它们的状态
4.作者充分理解了多线程scheduler执行任务定时器的多线程包

1.下面画一下这个链接池的实现的核心逻辑

2.png

相关文章

网友评论

      本文标题:高性能数据库链接池HikariCP原理分析

      本文链接:https://www.haomeiwen.com/subject/figskdtx.html