连接池commons-pool源码学习

作者: zyzab | 来源:发表于2017-10-13 15:54 被阅读0次

    上一篇简单的hello world

    ReaderUtil readerUtil = new ReaderUtil(new GenericObjectPool<StringBuffer>(new StringBufferFactory()));
    

    理解commons pool,需要了解主要类,接口

    • PooledObject : 池化对象,在对象的基础上,添加对象的属性,方法,方便判断对象的可用状
    • ObjectPool : 对象池,利用它管理,获取对象
    • PoolableObjectFactory : 可池化对象的维护工厂
    • GenericObjectPoolConfig : 连接池配置
    • LinkedBlockingDeque : 线程安全的阻塞队列数据结构
    • Evictor : 驱逐者线程,当该线程启动的是否,负责判断队列中的对象是否需要驱逐,驱逐完如果空闲对象数量小于最小可以使用的数量,维持最小的idel个对象,就创建等于最小数量的对象数

    一步步来吧

    • PooledObject : 池化对象,在对象的基础上,添加对象的属性,方法,方便判断对象的可用状态,比如池化StringBuffer字符操作对象
    PooledObject pooledObject = new DefaultPooledObject(new StringBuffer());
    
    public DefaultPooledObject(final T object) {
        //真正操作的还是StringBuffer对象,只是为了方便维护,让DefaultPooledObject装饰一番
        this.object = object;
    }
    

    ObjectPool : 对象池,利用它管理,获取对象,看看主要方法

    ObjectPool接口实现GenericObjectPool

    public GenericObjectPool(final PooledObjectFactory<T> factory,
                final GenericObjectPoolConfig config) {
    
            super(config, ONAME_BASE, config.getJmxNamePrefix());
    
            if (factory == null) {
                jmxUnregister(); // tidy up
                throw new IllegalArgumentException("factory may not be null");
            }
            this.factory = factory;
            //支持阻塞的线程安全队列
            idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());
            //设置连接池配置
            setConfig(config);
            //是否启动驱逐线程
            startEvictor(getTimeBetweenEvictionRunsMillis());
        }
    

    主要方法

    //从空闲队列LinkedBlockingDeque获取连接对象
    T borrowObject() throws Exception, NoSuchElementException,
                IllegalStateException;
    
    //把使用完的对象归还到空闲队列LinkedBlockingDeque
    void returnObject(T obj) throws Exception;
    
    //是否启动驱逐线程
    final void startEvictor(final long delay);
    
    获取连接对象
    public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
            assertOpen();
    
            final AbandonedConfig ac = this.abandonedConfig;
            //如果配置了遗弃,当前空闲队列对象数量小于2,并且正在使用中的对象数量大于(池中最多对象数量-3)
            if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                    (getNumIdle() < 2) &&
                    (getNumActive() > getMaxTotal() - 3) ) {
                //正在使用状态的,并且使用时间超过配置时间还没有归还的对象,则销毁
                removeAbandoned(ac);
            }
    
            PooledObject<T> p = null;
    
            // Get local copy of current config so it is consistent for entire
            // method execution
            final boolean blockWhenExhausted = getBlockWhenExhausted();
    
            boolean create;
            final long waitTime = System.currentTimeMillis();
    
            while (p == null) {
                create = false;
                p = idleObjects.pollFirst();
                if (p == null) {
                    p = create();
                    if (p != null) {
                        create = true;
                    }
                }
                if (blockWhenExhausted) {
                    if (p == null) {
                        //如果没有设置获取等待超时就一直等待,直到有对象可以获取
                        if (borrowMaxWaitMillis < 0) {
                            p = idleObjects.takeFirst();
                        } else {
                            //设置了获取超时时间,如果超过设置时间还没有获取到,直接返回null
                            p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                    TimeUnit.MILLISECONDS);
                        }
                    }
                    if (p == null) {
                        throw new NoSuchElementException(
                                "Timeout waiting for idle object");
                    }
                } else {
                    if (p == null) {
                        throw new NoSuchElementException("Pool exhausted");
                    }
                }
                if (!p.allocate()) {
                    p = null;
                }
    
                if (p != null) {
                    try {
                        //激活对象
                        factory.activateObject(p);
                    } catch (final Exception e) {
                        try {
                            //激活失败,销毁对象
                            destroy(p);
                        } catch (final Exception e1) {
                            // Ignore - activation failure is more important
                        }
                        p = null;
                        if (create) {
                            final NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to activate object");
                            nsee.initCause(e);
                            throw nsee;
                        }
                    }
                    if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                        boolean validate = false;
                        Throwable validationThrowable = null;
                        try {
                            //检查对象是否有效可用
                            validate = factory.validateObject(p);
                        } catch (final Throwable t) {
                            PoolUtils.checkRethrow(t);
                            validationThrowable = t;
                        }
                        if (!validate) {
                            try {
                                //无效则销毁
                                destroy(p);
                                destroyedByBorrowValidationCount.incrementAndGet();
                            } catch (final Exception e) {
                                // Ignore - validation failure is more important
                            }
                            p = null;
                            if (create) {
                                final NoSuchElementException nsee = new NoSuchElementException(
                                        "Unable to validate object");
                                nsee.initCause(validationThrowable);
                                throw nsee;
                            }
                        }
                    }
                }
            }
    
            updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
    
            return p.getObject();
        }
    
    removeAbandoned销毁状态在使用中,超过一段时间没有归还的对象
        private void removeAbandoned(final AbandonedConfig ac) {
            // Generate a list of abandoned objects to remove
            final long now = System.currentTimeMillis();
            final long timeout =
                    now - (ac.getRemoveAbandonedTimeout() * 1000L);
            final ArrayList<PooledObject<T>> remove = new ArrayList<PooledObject<T>>();
            final Iterator<PooledObject<T>> it = allObjects.values().iterator();
            while (it.hasNext()) {
                final PooledObject<T> pooledObject = it.next();
                synchronized (pooledObject) {
                    if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
                            pooledObject.getLastUsedTime() <= timeout) {
                        pooledObject.markAbandoned();
                        remove.add(pooledObject);
                    }
                }
            }
    
            // Now remove the abandoned objects
            final Iterator<PooledObject<T>> itr = remove.iterator();
            while (itr.hasNext()) {
                final PooledObject<T> pooledObject = itr.next();
                if (ac.getLogAbandoned()) {
                    pooledObject.printStackTrace(ac.getLogWriter());
                }
                try {
                    invalidateObject(pooledObject.getObject());
                } catch (final Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    归还连接
        public void returnObject(final T obj) {
            //从所有对象池中获取返回的对象
            final PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));
    
            if (p == null) {
                //如果没有遗弃配置AbandonedConfig,抛出异常,有则直接返回
                if (!isAbandonedConfig()) {
                    throw new IllegalStateException(
                            "Returned object not currently part of this pool");
                }
                return; // Object was abandoned and removed
            }
    
            synchronized(p) {
                final PooledObjectState state = p.getState();
                //判断对象状态是否是正在使用,如果不是抛出异常,是则修改对象状态为正在归还,防止被遗弃
                if (state != PooledObjectState.ALLOCATED) {
                    throw new IllegalStateException(
                            "Object has already been returned to this pool or is invalid");
                }
                p.markReturning(); // Keep from being marked abandoned
            }
    
            final long activeTime = p.getActiveTimeMillis();
    
            if (getTestOnReturn()) {
                //如果对象无效
                if (!factory.validateObject(p)) {
                    try {
                        //销毁对象,在空闲队列,所有集合中剔除对象,并且更新销毁对象数量,创建对象数量
                        destroy(p);
                    } catch (final Exception e) {
                        swallowException(e);
                    }
                    try {
                        //试图确保空闲池中存在有可用的实例
                        ensureIdle(1, false);
                    } catch (final Exception e) {
                        swallowException(e);
                    }
                    updateStatsReturn(activeTime);
                    return;
                }
            }
    
            try {
                //钝化对象,下次之前可以再复用该对象,比如对象是StringBuffer,可以用setLength(0)清空
                factory.passivateObject(p);
            } catch (final Exception e1) {
                swallowException(e1);
                try {
                    //同上
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
                try {
                    //同上
                    ensureIdle(1, false);
                } catch (final Exception e) {
                    swallowException(e);
                }
                updateStatsReturn(activeTime);
                return;
            }
    
            //释放资源
            if (!p.deallocate()) {
                throw new IllegalStateException(
                        "Object has already been returned to this pool or is invalid");
            }
    
            final int maxIdleSave = getMaxIdle();
            //空闲队列是否已经等于配置的最多空闲数量,如果是则销毁对象,不是则归还到空闲队列中
            if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
                try {
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
            } else {
                //如果配置的是先进先出,先进后出归还到空闲队列中
                if (getLifo()) {
                    idleObjects.addFirst(p);
                } else {
                    idleObjects.addLast(p);
                }
                if (isClosed()) {
                    // Pool closed while object was being added to idle objects.
                    // Make sure the returned object is destroyed rather than left
                    // in the idle object pool (which would effectively be a leak)
                    clear();
                }
            }
            updateStatsReturn(activeTime);
        }
    

    PoolableObjectFactory : 可池化对象的维护工厂

    public interface PooledObjectFactory<T> {
      /**
       * 创建对象
       * Create an instance that can be served by the pool and wrap it in a
       * {@link PooledObject} to be managed by the pool.
       *
       * @return a {@code PooledObject} wrapping an instance that can be served by the pool
       *
       * @throws Exception if there is a problem creating a new instance,
       *    this will be propagated to the code requesting an object.
       */
      PooledObject<T> makeObject() throws Exception;
    
      /**
       * 销毁对象
       * Destroys an instance no longer needed by the pool.
       * <p>
       * It is important for implementations of this method to be aware that there
       * is no guarantee about what state <code>obj</code> will be in and the
       * implementation should be prepared to handle unexpected errors.
       * <p>
       * Also, an implementation must take in to consideration that instances lost
       * to the garbage collector may never be destroyed.
       * </p>
       *
       * @param p a {@code PooledObject} wrapping the instance to be destroyed
       *
       * @throws Exception should be avoided as it may be swallowed by
       *    the pool implementation.
       *
       * @see #validateObject
       * @see ObjectPool#invalidateObject
       */
      void destroyObject(PooledObject<T> p) throws Exception;
    
      /**
       * 检验对象的有效性
       * Ensures that the instance is safe to be returned by the pool.
       *
       * @param p a {@code PooledObject} wrapping the instance to be validated
       *
       * @return <code>false</code> if <code>obj</code> is not valid and should
       *         be dropped from the pool, <code>true</code> otherwise.
       */
      boolean validateObject(PooledObject<T> p);
    
      /**
       * 激活对象
       * Reinitialize an instance to be returned by the pool.
       *
       * @param p a {@code PooledObject} wrapping the instance to be activated
       *
       * @throws Exception if there is a problem activating <code>obj</code>,
       *    this exception may be swallowed by the pool.
       *
       * @see #destroyObject
       */
      void activateObject(PooledObject<T> p) throws Exception;
    
      /**
       * 钝化对象,简单来说就是在归还对象的时候,清空对象,下次借用的可以直接使用
       * Uninitialize an instance to be returned to the idle object pool.
       *
       * @param p a {@code PooledObject} wrapping the instance to be passivated
       *
       * @throws Exception if there is a problem passivating <code>obj</code>,
       *    this exception may be swallowed by the pool.
       *
       * @see #destroyObject
       */
      void passivateObject(PooledObject<T> p) throws Exception;
    }
    

    看看jedis的实现JedisFactory,方便理解

    class JedisFactory implements PooledObjectFactory<Jedis> {
    
      @Override
      public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        final BinaryJedis jedis = pooledJedis.getObject();
        if (jedis.getDB() != database) {
          jedis.select(database);
        }
    
      }
    
      @Override
      public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
        final BinaryJedis jedis = pooledJedis.getObject();
        if (jedis.isConnected()) {
          try {
            try {
              jedis.quit();
            } catch (Exception e) {
            }
            jedis.disconnect();
          } catch (Exception e) {
    
          }
        }
    
      }
    
      @Override
      public PooledObject<Jedis> makeObject() throws Exception {
        final HostAndPort hostAndPort = this.hostAndPort.get();
        final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
            soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
    
        try {
          jedis.connect();
          if (password != null) {
            jedis.auth(password);
          }
          if (database != 0) {
            jedis.select(database);
          }
          if (clientName != null) {
            jedis.clientSetname(clientName);
          }
        } catch (JedisException je) {
          jedis.close();
          throw je;
        }
    
        return new DefaultPooledObject<Jedis>(jedis);
    
      }
    
      @Override
      public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        // TODO maybe should select db 0? Not sure right now.
      }
    
      @Override
      public boolean validateObject(PooledObject<Jedis> pooledJedis) {
        final BinaryJedis jedis = pooledJedis.getObject();
        try {
          HostAndPort hostAndPort = this.hostAndPort.get();
    
          String connectionHost = jedis.getClient().getHost();
          int connectionPort = jedis.getClient().getPort();
    
          return hostAndPort.getHost().equals(connectionHost)
              && hostAndPort.getPort() == connectionPort && jedis.isConnected()
              && jedis.ping().equals("PONG");
        } catch (final Exception e) {
          return false;
        }
      }
    }
    

    GenericObjectPoolConfig : 连接池配置

    • lifo: true为先进先出;false为先进后出,默认为true,表示对象的出借方式
    • maxWaitMillis: 当连接池资源耗尽时,调用者最大等待阻塞的时间(ms),默认为-1表示永不超时,建议设置值,如果资源一直等待超时,会卡死服务
    • maxTotal: 连接池中最大连接数,默认为8.
    • maxIdle: 连接池中最大空闲的连接数,默认为8.该参数一般尽量与maxTotal相同,以提高并发数
    • minIdle: 连接池中最小空闲的连接数,默认为0,该参数一般尽量比maxIdle小一些
    • blockWhenExhausted: 当连接池资源耗尽时,是否会阻塞等待,默认为true:阻塞
    • testOnBorrow: 调用者获取连接池资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试继续获取。默认为false。建议保持默认值
    • testOnReturn: 向连接池归还连接时,是否检测“连接”对象的有效性。默认为false。建议保持默认值
    • testOnCreate:向连接池添加创建对象时,是否检测“连接”对象的有效性。默认为false。建议保持默认值
    • testWhileIdle: 当驱逐空闲队列的连接对象时,是否允许空闲时进行有效性测试,默认为false
    • timeBetweenEvictionRunsMillis: “空闲连接”驱逐线程,检测的周期,毫秒数。如果为负值,表示不运行“驱逐线程”。默认为-1
    • numTestsPerEvictionRun:驱逐线程一次运行检查多少条“连接”,不要设置太大,太大需要更多的时间来执行
    • minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除
    • softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为-1.

    PooledObject对象的状态

    /**
     * Provides the possible states that a {@link PooledObject} may be in.
     *
     * @version $Revision: $
     *
     * @since 2.0
     */
    public enum PooledObjectState {
        /**
         * In the queue, not in use.
         * 位于队列中,未使用
         */
        IDLE,
    
        /**
         * In use.
         * 在使用
         */
        ALLOCATED,
    
        /**
         * In the queue, currently being tested for possible eviction.
         * 位于队列中,当前正在测试,可能会被回收
         */
        EVICTION,
    
        /**
         * Not in the queue, currently being tested for possible eviction. An
         * attempt to borrow the object was made while being tested which removed it
         * from the queue. It should be returned to the head of the queue once
         * eviction testing completes.
         * TODO: Consider allocating object and ignoring the result of the eviction
         *       test.
         * 不在队列中,当前正在测试,可能会被回收。从池中借出对象时需要从队列出移除并进行测试
         */
        EVICTION_RETURN_TO_HEAD,
    
        /**
         * In the queue, currently being validated.
         * 2.0没有用到
         */
        VALIDATION,
    
        /**
         * Not in queue, currently being validated. The object was borrowed while
         * being validated and since testOnBorrow was configured, it was removed
         * from the queue and pre-allocated. It should be allocated once validation
         * completes.
         * 2.0没有用到
         */
        VALIDATION_PREALLOCATED,
    
        /**
         * Not in queue, currently being validated. An attempt to borrow the object
         * was made while previously being tested for eviction which removed it from
         * the queue. It should be returned to the head of the queue once validation
         * completes.
         * 2.0没有用到
         */
        VALIDATION_RETURN_TO_HEAD,
    
        /**
         * Failed maintenance (e.g. eviction test or validation) and will be / has
         * been destroyed
         * 回收或验证失败,将销毁
         */
        INVALID,
    
        /**
         * Deemed abandoned, to be invalidated.
         * 即将无效
         */
        ABANDONED,
    
        /**
         * Returning to the pool.
         * 正在返还到池中
         */
        RETURNING
    }
    

    LinkedBlockingDeque是保存空闲队列的地方,借出,归还都在这里

    双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全

    private static final class Node<E> {
            /**
             * The item, or null if this node has been removed.
             */
            E item;
    
            /**
             * One of:
             * - the real predecessor Node
             * - this Node, meaning the predecessor is tail
             * - null, meaning there is no predecessor
             */
            Node<E> prev;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head
             * - null, meaning there is no successor
             */
            Node<E> next;
    
            /**
             * Create a new list node.
             *
             * @param x The list item
             * @param p Previous item
             * @param n Next item
             */
            Node(final E x, final Node<E> p, final Node<E> n) {
                item = x;
                prev = p;
                next = n;
            }
    }
    

    相关文章

      网友评论

        本文标题:连接池commons-pool源码学习

        本文链接:https://www.haomeiwen.com/subject/juizyxtx.html