Jedis源码阅读之连接池

作者: CoderMonkey | 来源:发表于2017-01-15 12:49 被阅读1011次

    1. 什么是连接池

    一般在程序中如果要和其他的系统创建连接进行交互并且连接的创建代价比较"昂贵"就需要用到连接池. 那怎么样才算是昂贵呢? 简单说来就是创建连接的时间接近甚至超过交互的时间. 所以连接池就是一个创建连接管理连接, 对连接进行缓存的技术. 最常见的连接池就是数据库连接池. 更加具体的关于连接池的介绍, 请移步连接池

    2. Jedis的连接池

    既然连接池的作用就是管理连接, 那Jedis的连接池也不例外, 它的作用就是缓存Jedisredis server之间的连接. Jedis 连接池的作用具体来说分为以下几个部分:

    2.1 创建保存连接的容器

    当初始化一个JedisPool的时候会创建一个LinkedBlockingDeque<PooledObject<T>> idleObjects队列, 这个队列用于存放已经和redis server建立连接********并且已经使用过********的连接对象(实际上存放的是DefaultPooledObject<Jedis>对象, 后面会看到).
    同时还会创建一个Map<IdentityWrapper<T>, PooledObject<T>> allObjects对象, 这个Map用于没有可用的连接时新创建出来的连接.

    2.2 发起请求获取连接

    当发起请求从连接池中获取一个连接的时候, 连接池会先从idleObjects队列中获取连接, 如果获取不到则开始创建一个新的连接, 创建新的连接是首先判断当前已经存在的连接是否已经大于连接池可容纳的连接数量, 如果是则不予创建, 以阻塞等的方式从idleObjects中等待获取可用连接(默认是阻塞不超时等待即等待直到有可用的连接, 但是也可以配置超时时间). 如果可以创建, 则创建一个新的连接放入到allObjects对象中, 同时将连接返回.

    2.3 连接使用完毕关闭连接

    当使用完连接调用连接关闭的时候, 连接池会将归还连接从allObjects中拿出来放入到idleObjects中, 所以下一次再获取连接将从idleObjects直接获取.
    但是这里要特别注意, 从allObjects中拿出来放入到idleObjects中时, 并没有将连接从allObjects中删除, 也就是说allObjectsidleObjects中的连接实际上是指向同一个对象实例的. 为什么要这么做, 而不是直接删除allObjects中的连接呢? 因为JedisPool会在指定的时间内对连接池中空闲对象进行删除, 这样可以减少资源的占用, 这个是JedisPool的单独线程自动完成的操作. 所以说, 如果有个连接创建出来长时间没有使用是会被自动销毁的, 而不是一直连接着占用资源.

    3. 源码阅读

    下面针对于上面提到的Jedis Pool的关键点来看看具体的代码实现

    3.1 连接池的创建
     JedisPool pool = new JedisPool(new JedisPoolConfig(), hnp.getHost(), hnp.getPort(), 2000);
    

    调用上面的代码就可以创建一个连接池了, 其中最关键的部分就是JedisPoolConfig对象的创建

    // JedisPoolConfig.java
    
    public class JedisPoolConfig extends GenericObjectPoolConfig {
      public JedisPoolConfig() {
        ...
        // 连接空闲的最小时间, 达到此值后空闲连接将会被移除. 负值(-1)表示不移除
        setMinEvictableIdleTimeMillis(60000);
        // "空闲链接"检测线程, 检测的周期, 毫秒数. 如果为负值, 表示不运行“检测线程”. 默认为-1
        setTimeBetweenEvictionRunsMillis(1000);
        ...
      }
    }
    

    在创建连接池的同时会创建idleObjects对象

    // GenericObjectPool.java
    public GenericObjectPool(PooledObjectFactory<T> factory,
                GenericObjectPoolConfig config) {
    
            ...other code...
            
            // 创建idleObjects对象
            idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());
    
            setConfig(config);
       
            startEvictor(getTimeBetweenEvictionRunsMillis());
        }
    

    在上面的代码中创建了idleObjects对象, 但是还有一句很关键的代码startEvictor(getTimeBetweenEvictionRunsMillis());.
    没错,这就是启动自动回收空闲连接的代码!

    // BaseGenericObjectPool.java
    
    final void startEvictor(long delay) {
            synchronized (evictionLock) {
                if (null != evictor) {
                    EvictionTimer.cancel(evictor);
                    evictor = null;
                    evictionIterator = null;
                }
                if (delay > 0) {
                    evictor = new Evictor();
                    EvictionTimer.schedule(evictor, delay, delay);
                }
            }
        }
    

    至于保存刚创建出来的连接的allObjects对象是GenericObjectPool的成员变量

     private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects =
            new ConcurrentHashMap<IdentityWrapper<T>, PooledObject<T>>();
    
    3.2 获取一个连接
     Jedis jedis = pool.getResource();
    

    调用上面的代码就可以获取一个连接了

    获取连接最关键的就是borrowObject

    // GenericObjectPool.java
    
    public T borrowObject(long borrowMaxWaitMillis) throws Exception {
            assertOpen();
    
            ...other code...
    
            PooledObject<T> p = null;
    
            // 没有空闲连接的时候是否阻塞的等待连接
            boolean blockWhenExhausted = getBlockWhenExhausted();
    
            boolean create;
            long waitTime = System.currentTimeMillis();
    
            while (p == null) {
                create = false;
                // 没有空闲连接时等待
                if (blockWhenExhausted) {
                    // 先从idleObjects队列中获取
                    p = idleObjects.pollFirst();
                    if (p == null) {
                      // 如果队列中没有空闲的连接, 则创建一个连接
                        p = create();
                        if (p != null) {
                            create = true;
                        }
                    }
                    // 如果连接创建失败, 则继续从idleObjects中阻塞的获取连接
                    if (p == null) {
                        if (borrowMaxWaitMillis < 0) {
                         // 无限制的等待, 不会超时
                            p = idleObjects.takeFirst();
                        } else {
                         // 有超时时间的等待
                            p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                    TimeUnit.MILLISECONDS);
                        }
                    }
                    if (p == null) {
                        throw new NoSuchElementException(
                                "Timeout waiting for idle object");
                    }
                    if (!p.allocate()) {
                        p = null;
                    }
                } else { // 没有空闲连接时直接抛出异常
                    p = idleObjects.pollFirst();
                    if (p == null) {
                        p = create();
                        if (p != null) {
                            create = true;
                        }
                    }
                    if (p == null) {
                        throw new NoSuchElementException("Pool exhausted");
                    }
                    if (!p.allocate()) {
                        p = null;
                    }
                }
        ...other code...
            }
    
            updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
       
       // 返回连接
            return p.getObject();
        }
    

    上面是获取连接时的一个完整的流程, 包括有空闲连接时直接返回, 没有空闲连接时创建空闲连接, 连接创建失败后是继续阻塞等待(包括是否超时)还是直接抛出异常. 下面看一下是如何创建一个连接的

    // GenericObjectPool.java
    
    private PooledObject<T> create() throws Exception {
    
       // 判断当前已经创建的连接是否已经超过设置的最大连接数(默认是8)
            int localMaxTotal = getMaxTotal();
            long newCreateCount = createCount.incrementAndGet();
            if (localMaxTotal > -1 && newCreateCount > localMaxTotal ||
                    newCreateCount > Integer.MAX_VALUE) {
                createCount.decrementAndGet();
                return null;
            }
    
            final PooledObject<T> p;
            try {
              // 创建一个到redis server的连接
                p = factory.makeObject();
            } catch (Exception e) {
                createCount.decrementAndGet();
                throw e;
            }
    
           ...other code...
    
            createdCount.incrementAndGet();
            // 将新创建的连接放入到allObjects中
            allObjects.put(new IdentityWrapper<T>(p.getObject()), p);
            // 返回新创建的连接
            return p;
     }
    

    上面就是创建一个到redis server连接的过程. 但是上面没有深究是如何与redis server创建连接的, 因为这次介绍的主题是JedisPool, 所以客户端与redis server创建连接的具体细节会在之后的文章中介绍.

    3.3 关闭一个连接

    在使用完一个连接之后就要将一个连接关闭. 其实上面创建一个连接之后还有一个比较重要的步骤

      @Override
      public Jedis getResource() {
        // 获取连接
        Jedis jedis = super.getResource();
       // 将连接池放入到连接中, 这里这么做的目的其实就是为关闭连接的时候作准备的
        jedis.setDataSource(this);
        return jedis;
      }
    

    调用下面的代码就可以关闭一个连接了

     jedis.close();
    

    我们知道连接池的作用就是为了缓存连接而生的, 所以这里的关闭连接肯定不能是直接和redis server断开连接, 所以让我们看看这里的关闭连接到底是做了什么操作从而实现连接的复用

    // Jedis.java
    public void close() {
      
       // 如果连接池存在就调用连接池的返回资源(这里的资源就是连接)的方法
        if (dataSource != null) {
          if (client.isBroken()) {
            this.dataSource.returnBrokenResource(this);
          } else {
            this.dataSource.returnResource(this);
          }
        } else { // 如果连接池不存在就直接关闭连接
          client.close();
        }
      }
    

    所以当关闭一个连接的时候如果连接存在其实是将资源还给了连接池. 其中最核心的方法就是returnObject

    // GenericObjectPool.java
    @Override
    public void returnObject(T obj) {
       
       // 从allObjects中获取要归还的连接
            PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));
            
            ...other code...
    
            int maxIdleSave = getMaxIdle();
            // 如果idleObjects队列中连接的数据已经>=允许的最大连接数或者连接池已经关闭就直接销毁这个连接
            if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
                try {
                    destroy(p);
                } catch (Exception e) {
                    swallowException(e);
                }
            } else { // 将连接放入到idleObjects队列中, 一旦将连接放入到idleObjects中如果连接长时间不被使用就会被自动回收
                if (getLifo()) { // 默认是使用last in first out机制
                    idleObjects.addFirst(p);
                } else {
                    idleObjects.addLast(p);
                }
                if (isClosed()) {
                    // Pool closed while object was being added to idle objects.
                    // Make sure the returned object is destroyed rather than left
                    // in the idle object pool (which would effectively be a leak)
                    clear();
                }
            }
            updateStatsReturn(activeTime);
        }
    

    4. 总结

    • 使用了这么久的连接池自从看了Jedis Pool的源码之后才对连接池有了一个直观的认识, 之后可以看看数据库的连接池, 比较一下两个对于连接池实现的异同
    • Jedis的连接池使用上是对apache common pool2的一个实现, 有了Jedis Pool这个例子以后要是要实现自己的连接池也方便许多

    相关文章

      网友评论

        本文标题:Jedis源码阅读之连接池

        本文链接:https://www.haomeiwen.com/subject/nwbybttx.html