美文网首页Java
Hikari定制连接池容器ConcurrentBag

Hikari定制连接池容器ConcurrentBag

作者: holysu | 来源:发表于2021-04-24 21:39 被阅读0次

    它是为了实现比 LinkedBlockingQueue 和 LinkedTransferQueue 更高的性能而特别定制的,根据连接池的特殊场景做了一些性能优化

    容器类定义

    public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {
       // 底层存储 CopyOnWriteArrayList,可以无锁安全的读取连接池的信息
       private final CopyOnWriteArrayList<T> sharedList;
       private final boolean weakThreadLocals; //默认是false 
         // 线程隔离的 FastList<PoolEntry>
       private final ThreadLocal<List<Object>> threadList;
         // 其实就是HikariPool,用来新建PoolEntry
       private final IBagStateListener listener;
         // 等待获取db连接的线程个数
       private final AtomicInteger waiters;
       private volatile boolean closed;
    
       private final SynchronousQueue<T> handoffQueue;
      //..
    }
    
    

    容器存储对象(条目)的定义

    // 容器存放的的 IConcurrentBagEntry 接口实现类 PoolEntry 
    public interface IConcurrentBagEntry
       {  
          // 空闲状态
          int STATE_NOT_IN_USE = 0;
          // 连接在使用
          int STATE_IN_USE = 1;
          // remove的时候先标记为被移除
          int STATE_REMOVED = -1;
          // 被预留状态,不可用但是可以移除 
          // 主要在检查线程中,要对连接进行softEvictConnection,确保能从not-in-use转到 reserved
          int STATE_RESERVED = -2;
    
          boolean compareAndSet(int expectState, int newState);
          void setState(int newState);
          int getState();
       }
    
    

    它的几种状态扭转

    image.png

    连接池容器初始化

    public ConcurrentBag(final IBagStateListener listener)
    {
         // 其实就是HikariPool, 负责创建PoolEntry
       this.listener = listener;
         // false
       this.weakThreadLocals = useWeakThreadLocals();
       this.handoffQueue = new SynchronousQueue<>(true);
       // 等待获取db连接的线程个数
       this.waiters = new **AtomicInteger**();
       //底层存储 CopyOnWriteArrayList
       this.sharedList = new **CopyOnWriteArrayList**<>();
       if (weakThreadLocals) {
          this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
       }
       else {
                // ThreadLocal, 使用自定义的FastList
          this.threadList = ThreadLocal.withInitial(() -> new **FastList**<>(IConcurrentBagEntry.class, 16));
       }
    }
    
    

    容器操作 crud

    1. 从容器获取对象 borrow(long timeout, TimeUnit timeunit)
    public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
    {
       // Try the thread-local list first
       // **1.** 先从 threadlocal的FastList中取,倒序遍历, 同一个线程归还连接后再获取最新的连接 可避免一些检查
       // 如果能取到就直接返回 这样无需锁
       final List<Object> list = threadList.get();
       for (int i = **list.size() - 1;** i >= 0; i--) {
          final Object entry = list.remove(i);
          @SuppressWarnings("unchecked")
          final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
          if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
             return bagEntry;
          }
       }
    
       // **2.** threadlocal获取不到,则需要从底层的sharedList中去寻找
       // 累加等待计数
       // Otherwise, scan the shared list ... then poll the handoff queue
       final int waiting = waiters.incrementAndGet();
       try {
          for (T bagEntry : sharedList) {
             if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                // If we may have stolen another waiter's connection, request another bag add.
                if (waiting > 1) {
                   listener.addBagItem(waiting - 1);
                }
                return bagEntry;
             }
          }
    
              // 如果连接池里边无空闲连接了,则需要HikariPool去增加连接
          listener.addBagItem(waiting);
    
          // 进入超时等待循环 
          timeout = timeUnit.toNanos(timeout);
          do {
             final long start = currentTime();
             final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
             if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                return bagEntry;
             }
    
             timeout -= elapsedNanos(start);
          } while (timeout > 10_000);
    
          return null;
       }
       finally {
         // 扣减等待计数
          waiters.decrementAndGet();
       }
    
    
    1. 新增对象 add(final T bagEntry)
    public void add(final T bagEntry)
    {
       if (closed) {
          LOGGER.info("ConcurrentBag has been closed, ignoring add()");
          throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
       }
    
       sharedList.add(bagEntry);
    
       // spin until a thread takes it or none are waiting
       while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !**handoffQueue.offer(bagEntry))** {
          Thread.yield();
       }
    }
    
    

    将对象塞到底层 copyOnWriteArrayList里,如果有线程在等待连接,会把新增的这个 bagEntry塞到 handoffQueue,给阻塞在 borrow() 超时等待的线程

    1. 归还对象 requite(final T bagEntry)
    public void requite(final T bagEntry)
    {
       bagEntry.setState(STATE_NOT_IN_USE);
    
       for (int i = 0; waiters.get() > 0; i++) {
          if (bagEntry.getState() != STATE_NOT_IN_USE || **handoffQueue.offer(bagEntry)**) {
             return;
          }
          else if ((i & 0xff) == 0xff) {
             parkNanos(MICROSECONDS.toNanos(10));
          }
          else {
             Thread.yield();
          }
       }
    
       final List<Object> threadLocalList = threadList.get();
       if (threadLocalList.size() < 50) {
          **threadLocalList**.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
       }
    }
    
    
    • 状态置为空闲 STATE_NOT_IN_USE
    • 如果此时有线程在等待db连接,则会把这个连接 交给 handoffQueue 直接让阻塞在 borrow 的线程获取到
    • 然后把这个连接存到 threadLocal里,这样当前线程如果再次获取db连接的话就会非常快
    1. 移除连接 remove(final T bagEntry)
    public boolean remove(final T bagEntry)
    {
       // 1.
       if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
          LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
          return false;
       }
       // 2.
       final boolean removed = sharedList.remove(bagEntry);
       if (!removed && !closed) {
          LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
       }
         // 3.
       threadList.get().remove(bagEntry);
    
       return removed;
    }
    
    
    • 只有空闲状态(NOT_IN_USE) 和 保留状态(RESERVED) 的连接才能被移除和关闭
    • 从底层 sharedList 中移除
    • 从 threadLocal 中移除

    为什么 hikari 号称最快的连接池?

    • 采用 threadLocal 缓存同一个线程使用过的db连接,并且是从最近一个开始遍历int i = list.size() - 1**;**, 这样同一个java线程多次获取db连接的时候就能快速获取到连接,而且获取到最鲜活的,还可避免 HikariPool#getConnection 的连接可用性的检查
    • ConcurrentBag 底层 sharedList 用的 copyOnWriteArrayList,所以读和写之间就不需要加锁了
    • 监控连接池状态 统计idle、active、total等个数的时候加锁的话, 就会阻塞住 add borrow 等操作
    • ConcurrentBag 申请连接borrow 和归还连接 requite 只是进行 cas 状态变更的无锁操作,并不会从 copyOnWriteArrayList 移除元素
    • 单线程的线程池:新建和关闭连接的线程池都是单线程的,也就避免了线程间的协调开销
    • handoffQueue: SynchronousQueue 当申请连接 borrow() 的时候,如果空闲连接不够用,线程就会阻塞在 handoffQueue#poll ; 在连接新增 add() 或归还 requite() 时,如果发现有线程在等待中则会把这个连接交给 handoffQueue ,这样等待中的线程直接能直接获取到(hand off很形象啊),而不用再遍历 sharedList 同时也保障了公平性 先到先得
    • minimumIdle 默认等于 maximumPoolSize, 尽量减少在申请连接的时候还需要创建连接的开销

    相关文章

      网友评论

        本文标题:Hikari定制连接池容器ConcurrentBag

        本文链接:https://www.haomeiwen.com/subject/iezlrltx.html