美文网首页
HikariCP连接池设计分析

HikariCP连接池设计分析

作者: CXYMichael | 来源:发表于2019-07-27 15:52 被阅读0次

    1 概述

    应用程序建立与数据库的连接其实是一项开销很大的工作,其中涉及网络连接的建立、会话的建立、数据库端与应用程序的适配等诸多操作。因此,大部分情况下我们会选择将数据库连接进行池化管理。

    连接池基本的思想是在系统初始化的时候,将数据库连接作为对象存储在内存中,当用户需要访问数据库时,并非建立一个新的连接,而是从连接池中取出一个已建立的空闲连接对象。使用完毕后,用户也并非将连接关闭,而是将连接放回连接池中,以供下一个请求访问使用。而连接的建立、断开都由连接池自身来管理。

    同时,还可以通过设置连接池的参数来控制连接池中的初始连接数、连接的上下限数以及每个连接的最大使用次数、最大空闲时间等等。也可以通过其自身的管理机制来监视数据库连接的数量、使用情况等。此外,大部分数据库连接池还提供了不同SQL Dialect的适配、查询缓存、性能监控、插件扩展等特性,进一步丰富了数据库连接池的功能。

    2 整体架构

    HikariCP与Druid同属于第二代连接池,但前者代码与结构极其精简。只需要从其核心类HikariPool入手,就可以把整体架构梳理出来。HikariCP启动时首先根据用户配置创建HikariConfig类,然后通过JDBC驱动加载DataSource,加载完成后根据配置初始化连接池,然后创建连接。连接的创建回收都是通过独立的线程池来异步处理的,同时还是用一个定时线程池来处理连接泄漏和数据监控统计的任务。所有的连接以PoolEntry的形式存储在ConcurrentBag中,每个PoolEntry对应一个被HikariCP代理的JDBC连接。


    hikaricp_struct.png

    3 连接管理

    3.1 申请连接

    HikariPool负责对资源连接进行管理,而ConcurrentBag则是作为物理连接的共享资源站,PoolEntry则是对物理连接的一对一封装。PoolEntry通过borrow方法从bag中取出,之后通过PoolEntry.createProxyConnection调用工厂类生成HikariProxyConnection返回。


    hikaricp_getconnection.png

    3.2 归还连接

    HikariProxyConnection调用close方法时实际上通过代理调用了PooleEntry的recycle方法,之后通过HikariPool调用了ConcurrentBag的requite放回。(poolEntry通过borrow从bag中取出,再通过requite放回。资源成功回收)。


    hikaricp_close.png

    3.3 创建连接

    HikariCP中通过独立的线程池addConnectionExecutor进行新连接的生成,连接生成方法为PoolEntryCreator。物理链接的生成只由PoolBase的newConnection()实现,之后封装成PoolEntry,通过Bag的add方法加入ConcurrentBag。当ConcurrentBag存在等待线程,或者有连接被关闭时,会触发IBagItemListener的addBagItem(wait)方法,调用PoolEntryCreator进行新连接的生成。


    hikaricp_createPoolEntry.png

    3.4 回收连接

    closeConnectionExecutor关闭连接后,会调用fillPool()方法对连接池进行连接填充。同时HikariPool提供evictConnection(Connection)方法对物理连接进行手动关闭。虽然连接池提供了直接回收连接的接口,但对于开发者来说一般不需要显示调用,只有连接本身状态异常或者连接池shutdown的时候才需要回收连接。


    hikaricp_evict.png

    4 数据结构

    4.1 ConcurrentBag

    ConcurrentBag内部同时使用了ThreadLocal和CopyOnWriteArrayList来存储元素,其中CopyOnWriteArrayList是线程共享的。ConcurrentBag采用了queue-stealing的机制获取元素:首先尝试从ThreadLocal中获取属于当前线程的元素来避免锁竞争,如果没有可用元素则扫描公共集合、再次从共享的CopyOnWriteArrayList中获取。(ThreadLocal列表中没有被使用的items在借用线程没有属于自己的时候,是可以被“窃取”的)
    ThreadLocal和CopyOnWriteArrayList在ConcurrentBag中都是成员变量,线程间不共享,避免了伪共享(false sharing)的发生。

       //负责存放ConcurrentBag中全部用于出借的资源
       private final CopyOnWriteArrayList<T> sharedList;
       //是否使用弱引用的标志位
       private final boolean weakThreadLocals;
       //用于加速线程本地化资源访问
       private final ThreadLocal<List<Object>> threadList;
       //任务窃取时的事件监听器
       private final IBagStateListener listener;
       //等待资源交接的线程计数器
       private final AtomicInteger waiters;
       private volatile boolean closed;
       //用于存在资源等待线程时的第一手资源交接
       private final SynchronousQueue<T> handoffQueue;
    

    ConcurrentBag的添加和删除方法比较简单,直接对sharedList进行添加操作,同时尝试通过handoffQueue交接新添加的Connection;而删除时则先CAS修改Connection的状态,如果操作成功才会移除。

    public void add(final T bagEntry)
    {
       if (closed) {
          LOGGER.info("ConcurrentBag has been closed, ignoring add()");
          throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
       }
       sharedList.add(bagEntry);//新添加的资源优先放入CopyOnWriteArrayList
       // 自旋等待直到将资源交到某个等待线程后才返回(SynchronousQueue)
       while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
          yield();
       }
    }
    
    public boolean remove(final T bagEntry)
    {
    // 如果资源正在使用且无法进行状态切换,则返回失败
       if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
          LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
          return false;
       }
       final boolean removed = sharedList.remove(bagEntry);// 从CopyOnWriteArrayList中移出
       if (!removed && !closed) {
          LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
       }
       return removed;
    }
    

    ConcurrentBag中通过borrow方法进行数据资源借用,通过requite方法进行资源回收,注意其中borrow方法只提供对象引用,不移除对象。所以从bag中“借用”的items实际上并没有从任何集合中删除,因此即使引用废弃了,垃圾收集也不会发生。因此使用时通过borrow取出的对象必须通过requite方法进行放回,否则会导致内存泄露,只有”remove”方法才能完全从bag中删除一个对象。

       public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
       {
          // 优先查看有没有可用的本地化的资源
          final List<Object> list = threadList.get();
          for (int i = list.size() - 1; i >= 0; i--) {
             final Object entry = list.remove(i);
             @SuppressWarnings("unchecked")
             final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
             if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                return bagEntry;
             }
          }
          final int waiting = waiters.incrementAndGet();
          try {
          // 当无可用本地化资源时,遍历全部资源,查看是否存在可用资源
          // 因此被一个线程本地化的资源也可能被另一个线程“抢走”
             for (T bagEntry : sharedList) {
                if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                   if (waiting > 1) {
                   // 因为可能“抢走”了其他线程的资源,因此提醒包裹进行资源添加
                      listener.addBagItem(waiting - 1);
                   }
                   return bagEntry;
                }
             }
             listener.addBagItem(waiting);
             timeout = timeUnit.toNanos(timeout);
             do {
                final long start = currentTime();
                // 当现有全部资源全部在使用中,等待一个被释放的资源或者一个新资源
                final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
                if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                   return bagEntry;
                }
                timeout -= elapsedNanos(start);
             } while (timeout > 10_000);
             return null;
          }
          finally {
             waiters.decrementAndGet();
          }
       }
    
       public void requite(final T bagEntry)
       {
          // 将状态转为未在使用
          bagEntry.setState(STATE_NOT_IN_USE);
          // 判断是否存在等待线程,若存在,则直接转手资源
          for (int i = 0; waiters.get() > 0; i++) {
             if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
                return;
             }
             else if ((i & 0xff) == 0xff) {
                parkNanos(MICROSECONDS.toNanos(10));
             }
             else {
                yield();
             }
          }
          // 否则,进行资源本地化
          final List<Object> threadLocalList = threadList.get();
          threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
       }
    

    4.2 FastList

    FastList是一个List接口的精简实现,只实现了接口中必要的几个方法。JDK ArrayList每次调用get()方法时都会进行rangeCheck检查索引是否越界,FastList的实现中去除了这一检查,只要保证索引合法那么rangeCheck就成为了不必要的计算开销(当然开销极小)。此外,HikariCP使用List来保存打开的Statement,当Statement关闭或Connection关闭时需要将对应的Statement从List中移除。通常情况下,同一个Connection创建了多个Statement时,后打开的Statement会先关闭。ArrayList的remove(Object)方法是从头开始遍历数组,而FastList是从数组的尾部开始遍历,因此更为高效。

    @Override
    public T get(int index)
    {
       return elementData[index];
    }
    
    @Override
    public boolean add(T element)
    {     
     if (size < elementData.length) {    
        elementData[size++] = element;
      } else {
           // 容量溢出时进行扩容,创建一个新的2倍容量数组然后浅拷贝过去,最后插入新元素
           final int oldCapacity = elementData.length;
           final int newCapacity = oldCapacity << 1;
           @SuppressWarnings("unchecked")
           final T[] newElementData = (T[]) Array.newInstance(clazz, newCapacity);
           System.arraycopy(elementData, 0, newElementData, 0, oldCapacity);
           newElementData[size++] = element;
           elementData = newElementData;
       }
       return true;
    }
    
    @Override
    public boolean remove(Object element)
    {
       for (int index = size - 1; index >= 0; index--) {
          if (element == elementData[index]) {
             final int numMoved = size - index - 1;
             if (numMoved > 0) {
                System.arraycopy(elementData, index + 1, elementData, index, numMoved);
             }
             elementData[--size] = null;
             return true;
          }
       }
       return false;
    }
    

    4.3 SuspendResumeLock

    该类内部有一个静态常量锁FAUX_LOCK,基于Semaphore封装,默认10000个令牌。HikariCP连接池初始化时会根据isAllowPoolSuspension来选择为新建立的连接池新建一个锁实例(用于实现连接池挂起)还是共享FAUX_LOCK。

    this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
    
       private static final int MAX_PERMITS = 10000;
       private final Semaphore acquisitionSemaphore;
       public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false)
    

    5 总结

    连接池给开发人员使用数据库带来了性能和开发效率两个方面的提升,但是也使得某些问题变得更复杂,出现故障时更难定位。比如数据库局部变量声明、事务的作用域、Prepared Statement管理、缓存管理、多线程复用连接导致的线程安全问题等。

    相关文章

      网友评论

          本文标题:HikariCP连接池设计分析

          本文链接:https://www.haomeiwen.com/subject/cxcnfctx.html