美文网首页
JedisPool连接池源码分析

JedisPool连接池源码分析

作者: mrchen004 | 来源:发表于2019-10-01 11:26 被阅读0次

    上一篇博客中详细分析了Jedis的内部实现细节,这篇博客来分析一下JedisPool的原理。

    JedisPool使用了commons.pool2框架,该框架提供了池化方案,可以在本地维护一个对象池,使用者只需要提供创建对象等一些简单的操作即可,接入非常简单

    a、JedisPool
    创建连接池,构造方法有很多个,总结起来,就是要提供GenericObjectPoolConfig对象、JedisFactory对象,前者用于控制连接池行为,后者用于创建Jedis对象

    public JedisPool(GenericObjectPoolConfig poolConfig, String host) {
            this(poolConfig, (String)host, 6379);
        }
    
    对外暴露的接口
        getResource  //从连接池中获取Jedis,会调用JedisFactory.borrowObject()
        returnBrokenResource  //归还不可用的Jedis对象
        returnResource  //归还Jedis对象,会调用returnObject(),使用完之后一定要归还,否则会导致严重的后果
    

    GenericObjectPoolConfig

    GenericObjectPoolConfig属性
        private int maxTotal = 8;
        private int maxIdle = 8;
        private int minIdle = 0;
    
    父类属性
        private boolean lifo = true;
        private boolean fairness = false;
        private long maxWaitMillis = -1L;
        private long minEvictableIdleTimeMillis = 1800000L;
        private long evictorShutdownTimeoutMillis = 10000L;
        private long softMinEvictableIdleTimeMillis = -1L;
        private int numTestsPerEvictionRun = 3;
        private String evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy";
        private boolean testOnCreate = false;
        private boolean testOnBorrow = false;
        private boolean testOnReturn = false;
        private boolean testWhileIdle = false;
        private long timeBetweenEvictionRunsMillis = -1L;
        private boolean blockWhenExhausted = true;
        private boolean jmxEnabled = true;
        private String jmxNamePrefix = "pool";
        private String jmxNameBase;
    

    JedisFactory:实现了PooledObjectFactory接口,这是commons.pool2框架的要求

    属性    
        private final AtomicReference<HostAndPort> hostAndPort;
        private final int connectionTimeout;
        private final int soTimeout;
        private final String password;
        private final int database;
        private final String clientName;
        private final boolean ssl;
        private final SSLSocketFactory sslSocketFactory;
        private final SSLParameters sslParameters;
        private final HostnameVerifier hostnameVerifier;
    
    接口方法
        activateObject
        destroyObject  //关闭Jedis资源
        makeObject  //创建Jedis
        passivateObject
        validateObject
    

    JedisPool继承自JedisPoolAbstract,后者继承自Pool,这个Pool类就是接入commons.pool2框架的类,在Pool类的构造方法中会去初始化一个对象池,用于管理Jedis

       public Pool(GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
            this.initPool(poolConfig, factory);
        }
    
        public void initPool(GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
            if (this.internalPool != null) {
                try {
                    this.closeInternalPool();
                } catch (Exception var4) {
                    ;
                }
            }
    
            this.internalPool = new GenericObjectPool(factory, poolConfig);
        }
    

    b、GenericObjectPool
    这个类是commons.pool2框架最核心的类,在该类中实现了对象池管理的大部分逻辑

    属性    
        private volatile String factoryType;
        private volatile int maxIdle;  //最大空闲对象数量,会保证空闲队列中的数量不大于这个值
        private volatile int minIdle;  //最小空闲对象数量,会保证空闲队列中的数量不小于这个值
        private final PooledObjectFactory<T> factory;  //提供创建对象、销毁对象的接口
        private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects;  //当前对象池中所有的对象
        private final AtomicLong createCount;  //当前对象池中的对象+正在创建的对象(这部分很关键)
        private long makeObjectCount;  //当前正在创建的对象的数量,同一时间只能创建一个对象
        private final Object makeObjectCountLock;  //用于控制makeObjectCount的读写
        private final LinkedBlockingDeque<PooledObject<T>> idleObjects;  //空闲队列
        private static final String ONAME_BASE = "org.apache.commons.pool2:type=GenericObjectPool,name=";
        private volatile AbandonedConfig abandonedConfig;
    
    构造方法
    public GenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig config) {
            super(config, "org.apache.commons.pool2:type=GenericObjectPool,name=", config.getJmxNamePrefix());
            this.factoryType = null;
            this.maxIdle = 8;
            this.minIdle = 0;
            this.allObjects = new ConcurrentHashMap();
            this.createCount = new AtomicLong(0L);
            this.makeObjectCount = 0L;
            this.makeObjectCountLock = new Object();
            this.abandonedConfig = null;
            if (factory == null) {
                this.jmxUnregister();
                throw new IllegalArgumentException("factory may not be null");
            } else {
                this.factory = factory;
                this.idleObjects = new LinkedBlockingDeque(config.getFairness());
                this.setConfig(config);
                this.startEvictor(this.getTimeBetweenEvictionRunsMillis());
            }
        }
    
    重要操作
        borrowObject
        returnObject
        invalidateObject
        clear
        close
        evict
        preparePool
        addObject
        use
        getNumWaiters
    

    重要的方法一个一个说明
    b.1 borrowObject 这个方法比较长,嵌套比较深,我把它拆解成多个代码片段

    代码片段1
    这个代码段的作用是从空闲队列取对象或者创建对象,得到一个对象之后进行状态更改,不断循环直到成功或者抛出异常(等待超时)
    do {
        if (p != null) {
            this.updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
            return p.getObject();
        }
    
        create = false;
        p = (PooledObject)this.idleObjects.pollFirst();//先从空闲队列取对象,如果取不到就会尝试创建一个对象,这里并不能保证创建成功
        if (p == null) {
            //创建对象的逻辑后面说
            p = this.create();
            if (p != null) {
                create = true;
            }
        }
    
        //阻塞等待空闲队列,这里应该是等待别人归还资源
        if (blockWhenExhausted) {
            if (p == null) {
                if (borrowMaxWaitMillis < 0L) {
                    p = (PooledObject)this.idleObjects.takeFirst();
                } else {
                    p = (PooledObject)this.idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS);
                }
            }
    
            if (p == null) {
                throw new NoSuchElementException("Timeout waiting for idle object");
            }
        } else if (p == null) {
            throw new NoSuchElementException("Pool exhausted");
        }
    
        //检查并更改p的状态,将IDLE状态改为ALLOCATED状态
        if (!p.allocate()) {
            p = null;
        }
    } while(p == null);
    
    代码片段2
    这段代码主要是得到一个对象之后,激活对象,不断尝试直到成功
    do {
        代码片段1  得到一个对象
        try {
            //调用用户接口类激活对象,Jedis基本不需要激活
            this.factory.activateObject(p);
        } catch (Exception var13) {
            try {
                this.destroy(p);
            } catch (Exception var12) {
                ;
            }
    
            p = null;
            if (create) {
                NoSuchElementException nsee = new NoSuchElementException("Unable to activate object");
                nsee.initCause(var13);
                throw nsee;
            }
        }
    } while(p == null);
    
    public T borrowObject(long borrowMaxWaitMillis) throws Exception {
            this.assertOpen();
            AbandonedConfig ac = this.abandonedConfig;
            if (ac != null && ac.getRemoveAbandonedOnBorrow() && this.getNumIdle() < 2 && this.getNumActive() > this.getMaxTotal() - 3) {
                this.removeAbandoned(ac);
            }
    
            //对象池把对象封装成PooledObject
            PooledObject<T> p = null;
            boolean blockWhenExhausted = this.getBlockWhenExhausted();  //配置,当所有对象都用尽时,是否阻塞等待空闲队列
            long waitTime = System.currentTimeMillis();  //这里只是用于统计,对对象池的行为没有任何影响
    
            while(true) {
                boolean create;
                do {
                    代码片段2  得到对象,激活对象
                } while(!this.getTestOnBorrow() && (!create || !this.getTestOnCreate()));
    
                boolean validate = false;
                Throwable validationThrowable = null;
    
                try {
                    //检验对象是否可用,JedisFactory中做的事情就是检查是否Jedis对应的Socket是否已经连接,并发送一个ping命令,看是否返回pong
                    validate = this.factory.validateObject(p);
                } catch (Throwable var15) {
                    PoolUtils.checkRethrow(var15);
                    validationThrowable = var15;
                }
    
                if (!validate) {
                    try {
                        this.destroy(p);
                        this.destroyedByBorrowValidationCount.incrementAndGet();
                    } catch (Exception var14) {
                        ;
                    }
    
                    p = null;
                    if (create) {
                        NoSuchElementException nsee = new NoSuchElementException("Unable to validate object");
                        nsee.initCause(validationThrowable);
                        throw nsee;
                    }
                }
            }
        }
    

    b.2 create 内部方法,用于封装创建对象的逻辑

    private PooledObject<T> create() throws Exception {
            int localMaxTotal = this.getMaxTotal();
            if (localMaxTotal < 0) {
                localMaxTotal = 2147483647;
            }
    
            //标识是否能够创建对象
            Boolean create = null;
    
            while(create == null) {
                Object var3 = this.makeObjectCountLock;
                //抢到锁就意味着,这个线程具有创建对象的资格,直到达到可创建对象数量的最大值
                //这是一个可重入锁,所以同一个线程可以同时创建对象
                synchronized(this.makeObjectCountLock) {
                    long newCreateCount = this.createCount.incrementAndGet();
                    if (newCreateCount > (long)localMaxTotal) {
                        this.createCount.decrementAndGet();
                        if (this.makeObjectCount == 0L) {
                            create = Boolean.FALSE;
                        } else {
                            this.makeObjectCountLock.wait();
                        }
                    } else {
                        //正在创建的对象数量加1
                        ++this.makeObjectCount;
                        create = Boolean.TRUE;
                    }
                }
            }
    
            if (!create) {
                return null;
            } else {
                boolean var16 = false;
    
                PooledObject p;
                try {
                    var16 = true;//标识是否创建对象失败
                    p = this.factory.makeObject();
                    var16 = false;
                } catch (Exception var19) {
                    this.createCount.decrementAndGet();
                    throw var19;
                } finally {
                    if (var16) {
                        Object var9 = this.makeObjectCountLock;
                        synchronized(this.makeObjectCountLock) {
                            --this.makeObjectCount;
                            this.makeObjectCountLock.notifyAll();
                        }
                    }
                }
    
                Object var23 = this.makeObjectCountLock;
                synchronized(this.makeObjectCountLock) {
                    --this.makeObjectCount;
                    this.makeObjectCountLock.notifyAll();
                }
    
                AbandonedConfig ac = this.abandonedConfig;
                if (ac != null && ac.getLogAbandoned()) {
                    p.setLogAbandoned(true);
                    if (p instanceof DefaultPooledObject) {
                        ((DefaultPooledObject)p).setRequireFullStackTrace(ac.getRequireFullStackTrace());
                    }
                }
    
                this.createdCount.incrementAndGet();
                this.allObjects.put(new IdentityWrapper(p.getObject()), p);
                return p;
            }
        }
    

    b.3 returnObject

    public void returnObject(T obj) {
            //allObjects是一个map结构,因为会被多个线程访问,所以需要使用ConcurrentHashMap
            //其中key是IdentityWrapper,其hash()是返回对象池中的对象的hash码,value是PooledObject
            PooledObject<T> p = (PooledObject)this.allObjects.get(new IdentityWrapper(obj));
            if (p == null) {
                if (!this.isAbandonedConfig()) {
                    throw new IllegalStateException("Returned object not currently part of this pool");
                }
            } else {
                //同步修改PooledObject状态
                synchronized(p) {
                    PooledObjectState state = p.getState();
                    if (state != PooledObjectState.ALLOCATED) {
                        throw new IllegalStateException("Object has already been returned to this pool or is invalid");
                    }
    
                    p.markReturning();
                }
    
                long activeTime = p.getActiveTimeMillis();
                //如果对象已经损坏,那就销毁
                if (this.getTestOnReturn() && !this.factory.validateObject(p)) {
                    try {
                        this.destroy(p);
                    } catch (Exception var10) {
                        this.swallowException(var10);
                    }
    
                    try {
                        this.ensureIdle(1, false);
                    } catch (Exception var9) {
                        this.swallowException(var9);
                    }
    
                    this.updateStatsReturn(activeTime);
                } else {
                    try {
                        //activeObject的反操作,JedisFactory中空实现,无需做任何事情
                        this.factory.passivateObject(p);
                    } catch (Exception var12) {
                        this.swallowException(var12);
    
                        try {
                            this.destroy(p);
                        } catch (Exception var8) {
                            this.swallowException(var8);
                        }
    
                        try {
                            this.ensureIdle(1, false);
                        } catch (Exception var7) {
                            this.swallowException(var7);
                        }
    
                        this.updateStatsReturn(activeTime);
                        return;
                    }
    
                    //将状态改回IDLE
                    if (!p.deallocate()) {
                        throw new IllegalStateException("Object has already been returned to this pool or is invalid");
                    } else {
                        //如果设置了maxIdle并且当前空闲队列中的对象数量已经达到了最大值,那么销毁对象,否则加入到空闲队列
                        int maxIdleSave = this.getMaxIdle();
                        if (this.isClosed() || maxIdleSave > -1 && maxIdleSave <= this.idleObjects.size()) {
                            try {
                                this.destroy(p);
                            } catch (Exception var11) {
                                this.swallowException(var11);
                            }
                        } else {
                            if (this.getLifo()) {
                                this.idleObjects.addFirst(p);
                            } else {
                                this.idleObjects.addLast(p);
                            }
    
                            if (this.isClosed()) {
                                this.clear();
                            }
                        }
    
                        this.updateStatsReturn(activeTime);
                    }
                }
            }
        }
    
    PooledObject的状态值
        IDLE,//创建对象之后或者归还状态之后的状态
        ALLOCATED,//对象分配出去的状态
        EVICTION,
        EVICTION_RETURN_TO_HEAD,
        VALIDATION,
        VALIDATION_PREALLOCATED,
        VALIDATION_RETURN_TO_HEAD,
        INVALID,
        ABANDONED,
        RETURNING;//正在归还的状态
    

    相关文章

      网友评论

          本文标题:JedisPool连接池源码分析

          本文链接:https://www.haomeiwen.com/subject/gbjkpctx.html