美文网首页
mybatis 连接池探究

mybatis 连接池探究

作者: 上重楼 | 来源:发表于2018-05-24 15:32 被阅读1138次

    起因是目前再看 《Java并发编程的艺术》 并发基础那章实现了一个简单的连接池,虽然只是为了展示等待/通知,但是我对平时常用的mybatis的 连接池是怎么实现的感到很好奇,所以就有了这次的记录。框架平时用的多,但是源码还真的没怎么看,这一篇也算是为阅读源码起一个头。

    mybatis版本是:3.4.0
    连接池核心类:org.apache.ibatis.datasource.pooled.PooledDataSource
    测试代码是:
    简单粗暴的抛出Exception 是为了移除异常处理,关注主流程

        public static void main(String[] args) throws Exception {
            //构建一个连接池 有很多重载的构造函数
            PooledDataSource pool = new PooledDataSource(
                    "com.mysql.jdbc.Driver",
                    "jdbc:mysql://127.0.0.1:3308/db_name",
                    "user_name",
                    "password");
             
             //从连接池取出一个连接,这个是重头戏
            Connection connection = pool.getConnection();
            Statement statement = connection.createStatement();
            ResultSet resultSet = statement.executeQuery("SELECT * FROM checking_in_log LIMIT 1");
            if (resultSet.next()) {
                String id = resultSet.getString("id");
                System.out.println(id);
            }
            resultSet.close();
            statement.close();
            connection.close();
        }
    

    首先跟着测试代码走一遍:

    public PooledDataSource(String driver, String url, String username, String password) {
        dataSource = new UnpooledDataSource(driver, url, username, password);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    

    第一行代码就是初始化这个PooledDataSource对象了,里面做了2件事,初始化一个UnpooledDataSource对象,还有把expectedConnectionTypeCode设置为("" + url + username + password).hashCode();

    UnpooledDataSource和PoolState这2个对象是PooledDataSource最关键的2个对象了,可以说PooledDataSource是对UnpooledDataSource的使用上的封装,数据库连接参数,设置参数大多都是通过调用UnpooledDataSource进行的。

    这是UnpooledDataSource的方法
    getConnection
    getConnection
    setLoginTimeout
    getLoginTimeout
    setLogWriter
    getLogWriter
    getDriverClassLoader
    setDriverClassLoader
    getDriverProperties
    setDriverProperties
    getDriver
    setDriver
    getUrl
    setUrl
    getUsername
    setUsername
    getPassword
    setPassword
    isAutoCommit
    setAutoCommit
    getDefaultTransactionIsolationLevel
    setDefaultTransactionIsolationLevel
    

    而PoolState作为一个数据对象,连接池就保存在里面 这个对象里面的方法都标记了synchronized,而对象里面的统计字段基本都是被多线程竞争的,我觉得可以改为使用JUC的工具类atomicLong。毕竟竞争压力大,synchronized的偏向锁 轻量锁无法起到一个优化的作用。可能这个类在jdk1.5以前就创建了。

    这是里面的字段
        //PooledDataSource是通过new PoolState(this)得出的PollState 
      protected PooledDataSource dataSource;
      //空闲连接池
      protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
      //活动中的连接池
      protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
      //请求总数
      protected long requestCount = 0;
     //累计请求时间
      protected long accumulatedRequestTime = 0;
      //累计获取时间(连接池被别的线程拿走执行的时间)
      protected long accumulatedCheckoutTime = 0;
      //超时执行连接数计数,当连接执行超过了poolMaximumCheckoutTime(20秒)而这时候有请求竞争这个连接的时候这个值加一,同时连接会rollback,然后给新的竞争线程去使用
      protected long claimedOverdueConnectionCount = 0;
      //这个是accumulatedCheckoutTime是一样的
      protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
      //累计等待时间,就是连接都被使用了,而又没办法抢别人超时执行的连接的时候就只能乖乖等待了
      protected long accumulatedWaitTime = 0;
      //每次等待连接就加一
      protected long hadToWaitCount = 0;
      //当获取到一个已经失效的连接的时候就加一
      protected long badConnectionCount = 0;
    
    //取出连接所调用的方法
     @Override
      public Connection getConnection() throws SQLException {
        return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
      }
    

    方法的返回类型是:java.sql.Connection
    通过调用popConnection返回org.apache.ibatis.datasource.pooled.PooledConnection这个mybatis的封装类,然后调用它的getProxyConnection获得java.sql.Connection
    PooledConnection实现了InvocationHandler接口,然后内部有2个Connection对象

    private Connection realConnection;
    private Connection proxyConnection;

    realConnection是真实的连接,在构造器传入
    proxyConnection是在构造器里面通过反射代理创建的一个对象
    private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
    通过源码发现,之所以有proxyConnection是为了拦截调用Connection.close()
    在PooledConnection的invoke实现方法(InvocationHandler接口)可以看到

    @Override
      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
          dataSource.pushConnection(this);
          return null;
        } else {
          try {
            if (!Object.class.equals(method.getDeclaringClass())) {
              // issue #579 toString() should never fail
              // throw an SQLException instead of a Runtime
              checkConnection();
            }
            return method.invoke(realConnection, args);
          } catch (Throwable t) {
            throw ExceptionUtil.unwrapThrowable(t);
          }
        }
      }
    

    如果是close方法,调用后并不会真正去执行Connection.close(),而是把连接放回连接池,至于其他方法都是通过realConnection去执行的。

    接下来看看2个核心方法,拿和还
    pushConnection
    popConnection

    //取出一个连接
     private PooledConnection popConnection(String username, String password) throws SQLException {
        //是否需要等待连接标记
        boolean countedWait = false;
        //返回对象
        PooledConnection conn = null;
        //方法开始的时间,用来计算请求连接耗费时间的
        long t = System.currentTimeMillis();
        //获取到坏连接的次数
        int localBadConnectionCount = 0;
        
        //如果没获取到连接就一直重试,有3种情况会退出循环,成功获取到连接
        //调用wait(timeout)的时候抛出//InterruptedException异常
       //localBadConnectionCount > (poolMaximumIdleConnections + 3)的时候抛出异常,这个意思是获取到失效连接次数已经大于最大空闲连接数+3次时,不再尝试面对现实
        while (conn == null) {
          //先对 PoolState这个对象加锁,前面也有说,这个对象在连接池是唯一的。
          synchronized (state) {
            //如果空闲连接列表不为空,从其中取出第一个连接,这是最理想的情况了,在压力不大的时候一般都这样
            if (!state.idleConnections.isEmpty()) {
              // Pool has available connection
              conn = state.idleConnections.remove(0);
              if (log.isDebugEnabled()) {
                log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
              }
            } else {
              //下面那行源码英文注释其实已经说明了问题,当活动连接还没达到最大活动连接限制的时候直接new一个新连接
              // Pool does not have available connection
              if (state.activeConnections.size() < poolMaximumActiveConnections) {
                // Can create new connection
                conn = new PooledConnection(dataSource.getConnection(), this);
                if (log.isDebugEnabled()) {
                  log.debug("Created connection " + conn.getRealHashCode() + ".");
                }
              } else {
                //在不能创建新连接的时候就要进行下面的操作了,如果第1个连接的执行已经超时就干掉它取而代之,否则就乖乖等待
                // Cannot create new connection
                //获取最初创建的连接
                PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                //获取连接的执行时间,就是连接被线程持有的时间
                long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                //判断是否已经执行超时,默认是20秒
                if (longestCheckoutTime > poolMaximumCheckoutTime) {
                  // Can claim overdue connection
                  //这4个在前面已经解释了
                  state.claimedOverdueConnectionCount++;
                  state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
                  state.accumulatedCheckoutTime += longestCheckoutTime;
                  //就旧的连接从活动连接列表移除
                  state.activeConnections.remove(oldestActiveConnection);
                  //下面这个就很残暴了,如果连接不是自动提交的,调用它的回滚。如果人家正在执行就懵逼了,所以长时间执行的大sql poolMaximumCheckoutTime还是有必要设置一下
                  if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                    try {
                      oldestActiveConnection.getRealConnection().rollback();
                    } catch (SQLException e) {
                      log.debug("Bad connection. Could not roll back");
                    }  
                  }
                  //这里相当于把旧衣服脱掉,穿上新衣服。😀 真正的Connection对象还是没变的
                  conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                  //把旧对象标记为失效
                  oldestActiveConnection.invalidate();
                  if (log.isDebugEnabled()) {
                    log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
                  }
                } else {
                  // Must wait    需要等待的处理
                  try {
                    //累加一次等待计数,判断条件是避免循环等待的时候多次累计
                    if (!countedWait) {
                      state.hadToWaitCount++;
                      countedWait = true;
                    }
                    if (log.isDebugEnabled()) {
                      log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                    }
                    //下面几行就是并发编程的内容了,poolTimeToWait默认值是20秒
                    long wt = System.currentTimeMillis();
                    //wait 放弃掉自己的锁,然后开始等待被notify唤醒或者达到超时时间,这里要注意的是,如果已经到了超时时间,而别的线程还没有释放锁,照样是醒不来的
                    state.wait(poolTimeToWait);
                    state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                  } catch (InterruptedException e) {
                    //中断异常,跳出
                    break;
                  }
                }
              }
            }
    
            //经历了前面的磨难,终于快到收官了 如果连接对象不为null且有效
            //conn.isValid() 方法做了:valid && realConnection != null && dataSource.pingConnection(this);
            if (conn != null) {
              if (conn.isValid()) {
                //又是这一步操作,简单粗暴/(ㄒoㄒ)/~~  
                if (!conn.getRealConnection().getAutoCommit()) {
                  conn.getRealConnection().rollback();
                }
                //这一步在构造器那里有类似的,连接的hashCode
                conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                //设置连接被获取的时间,这个在前面判断是否超时执行的时候用到了
                conn.setCheckoutTimestamp(System.currentTimeMillis());
                //最后更新时间
                conn.setLastUsedTimestamp(System.currentTimeMillis());
                //将这个穿了新衣服的连接加入到活动连接
                state.activeConnections.add(conn);
                state.requestCount++;
                state.accumulatedRequestTime += System.currentTimeMillis() - t;
              } else {
                //这个分支是最苦逼的,获取到连接,但是是一个失效的。
                if (log.isDebugEnabled()) {
                  log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
                }
                state.badConnectionCount++;
                localBadConnectionCount++;
                //将连接抛弃掉,重新去获取
                conn = null;
                  //下面这几行前面也有说,获取失效连接超过一定次数,抛出异常,放弃希望
                if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
                  if (log.isDebugEnabled()) {
                    log.debug("PooledDataSource: Could not get a good connection to the database.");
                  }
                  throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                }
              }
            }
          }
    
        }
    
        //这个循环体外的判断,看上去主要还是防止那个wait抛出中断异常的break
        if (conn == null) {
          if (log.isDebugEnabled()) {
            log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
          }
          throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }
    
        return conn;
      }
    

    长长的popConnection后面还有一个短短的pushConnection 添加链接到

     protected void pushConnection(PooledConnection conn) throws SQLException {
        //同样是先加锁
        synchronized (state) {
          //先把要链接从活动连接里面移除
          state.activeConnections.remove(conn);
          //要添加的这个连接还必须是一个有效连接
          if (conn.isValid()) {
            //判断空闲连接列表还有空位,且这个连接是从这个池创建的
            if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
              //和获取连接有点类似,累加执行时间
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              //不为自动提交的时候回滚一下
              if (!conn.getRealConnection().getAutoCommit()) {
                conn.getRealConnection().rollback();
              }
              //换个新衣服
              PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
              //将连接对象加到空闲列表
              state.idleConnections.add(newConn);
              //使用旧连接对象的创建时间和修改时间 
              newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
              newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
              //将旧对象标记为失效
              conn.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
              }
              //通知所有对state对象执行wait的线程可以继续了(让可能的在等待获取连接的wait对象动一下)
              state.notifyAll();
            } else {
              //如果空闲列表满了,或者这个连接对象不属于这个池创建的 真正的关闭这个连接 然后标记为失效
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              if (!conn.getRealConnection().getAutoCommit()) {
                conn.getRealConnection().rollback();
              }
             //执行真实连接对象的close,返回给用户的连接都是ProxyConnection那个动态代理的对象,close只是返回连接到池。
              conn.getRealConnection().close();
              if (log.isDebugEnabled()) {
                log.debug("Closed connection " + conn.getRealHashCode() + ".");
              }
              //标记为失效
              conn.invalidate();
            }
          } else {  
            //无效的连接对象直接就不处理,只累计一下计数器
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
            }
            state.badConnectionCount++;
          }
        }
      }
    

    大概就是这些了,有一些比较蛋疼的地方,比如PoolState的
    protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
    protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
    空闲列表和活动列表,这2个列表是一直在变化的,所以用ArrayList性能就有点问题了。而源码里面更多只是对头和尾进行操作,这里用LinkedList可能会更好?

    相关文章

      网友评论

          本文标题:mybatis 连接池探究

          本文链接:https://www.haomeiwen.com/subject/mjtsjftx.html