美文网首页MyBatis源码剖析
[MyBatis源码分析 - 数据源模块]

[MyBatis源码分析 - 数据源模块]

作者: 小胡_鸭 | 来源:发表于2020-12-07 23:15 被阅读0次

    一、简介

      数据源是实际开发中常用的组件之一。现在开源的数据源都提供了比较丰富的功能,例如,连接池功能、检测连接状态等,选择性能优秀的数据源组件对于提升 ORM 框架乃至整个应用的性能都是非常重要的。

      MyBatis 自身提供了相应的数据源实现,当然 MyBatis 也提供了与第三方数据源集成的接口,这些功能都位于数据源模块之中,该模块位于 org.apache.ibatis.datasource 包中,相关类组成如下所示:

      这些类的关系如下图所示:



    二、设计模式

      数据源模块使用了工厂方法模式,其模式的 UML 图如下所示:



      工厂方法由四个角色构成:

    • 工厂接口(Factory):工厂接口是工厂方法模式的核心接口,调用这会直接与工厂接口交互用于获取具体的产品实现类。
    • 具体工厂类(ConcreteFactory):具体工厂类是工厂接口的实现类,用于实例化产品对象,不同的具体工厂类会根据需求实例化不同的产品实现类。
    • 产品接口(Product):产品接口用于定义产品类的功能,具体工厂类产生的所有产品对象都必须实现该接口。调用者一般会面向产品接口进行编程,所以产品接口会与调用者直接交互,也是调用者最为关心的接口。
    • 具体产品类(ConcreteProduct):实现产品接口的实现类,具体产品类中定义了具体的业务逻辑。

      DataSourceFactory 是工厂接口,其具体实现工厂类有 UnpooledDataSourceFactoryJndiDataSourceFactoryPooledDataSourceFactoryDataSource 是产品接口,其具体实现产品类有 PooledDataSourceUnpooledDataSource。每个具体工厂都会生产一种具体的产品,对于外部应用程序,直接使用工厂接口和产品接口,面向接口编程而不关心具体实现屏蔽底层复杂性,同时符合 “开放-封闭” 原则,底层可以聚合更多的实现。

    三、DataSourceFactory

      数据源模块的工厂接口,定义了生产 javax.sql.DataSource 对象的方法,源码如下:

    public interface DataSourceFactory {
    
      void setProperties(Properties props);
    
      DataSource getDataSource();
    
    }
    

      数据源模块中内置了 UnpooledDataSourceFactoryPooledDataSourceFactoryJndiDataSourceFactory 三种具体工厂实现类。

    3.1 UnpooledDataSourceFactory

      UnpooledDataSourceFactory 实现了 DataSourceFactory 接口,内置 DataSource 字段,并在构造函数中创建 UnpooledDataSource 对象来初始化该字段,并对外提供了 getDataSource() 方法来获取该字段。

    UNPOOLED —— 这个数据源的实现只是每次被请求时打开和关闭连接。虽然有点慢,但对于在数据库连接可用性方面没有太高要求的简单应用程序来说,是一个很好的选择。 不同的数据库在性能方面的表现也是不一样的,对于某些数据库来说,使用连接池并不重要,这个配置就很适合这种情形。UNPOOLED 类型的数据源仅仅需要配置以下 5 种属性:

    • url:数据库 JDBC URL 地址。
    • username:登录数据库的用户名。
    • password:登录数据库的密码。
    • defaultTransactionIsolationLevel:默认的事务隔离级别。
    • driver.*:JDBC 驱动的 Java 类的完全限定名(并不是JDBC驱动中可能包含的数据源类)。

    如果要传递属性给数据库驱动,需要将属性的前缀设置为 “driver.*”,如:
    driver.encoding=UTF8

    最终这些属性都会通过 DriverManager.getConnection(url, driverProperties) 方法传递属性给数据库驱动并创建一个 Connection 连接对象。

    3.1.1 构造方法

    public class UnpooledDataSourceFactory implements DataSourceFactory {
    
      private static final String DRIVER_PROPERTY_PREFIX = "driver.";
      private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();
    
      protected DataSource dataSource;
    
      public UnpooledDataSourceFactory() {
        this.dataSource = new UnpooledDataSource();
      }
    }
    

    3.1.2 getDataSource

      @Override
      public DataSource getDataSource() {
        return dataSource;
      }
    

    3.1.3 setProperties

      #setProperties 方法会将传入的 properties 属性对象分类处理,然后设置到 dataSource 中,代码如下:

      @Override
      public void setProperties(Properties properties) {
        Properties driverProperties = new Properties();
        // 创建 dataSource 对应的 MetaObject 对象
        MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
        // 遍历传入的属性集合
        for (Object key : properties.keySet()) {
          String propertyName = (String) key;
          // 如果属性名以 "driver." 开头,则添加到 driverProperties 中
          if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
            String value = properties.getProperty(propertyName);
            driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
          // 如果属性名在类中对对应的 setter,则先将属性值转化为对应的类似,再通过 MetaObject 设置
          } else if (metaDataSource.hasSetter(propertyName)) {
            String value = (String) properties.get(propertyName);
            Object convertedValue = convertValue(metaDataSource, propertyName, value);
            metaDataSource.setValue(propertyName, convertedValue);
          } else {
            throw new DataSourceException("Unknown DataSource property: " + propertyName);
          }
        }
        // 将上面收集的 driverProperties 通过 MetaObject 设置到 dataSource 对象的 driverProperties 属性中
        // DataSource 是一个接口,没有该属性,但是实现该接口的 UnpooledDataSource 类中定义了该属性
        if (driverProperties.size() > 0) {
          metaDataSource.setValue("driverProperties", driverProperties);
        }
      }
    

      如果非 “driver.” 前缀的属性,需要调用 #convertValue 将属性值字符串转换为对应类型的属性值,其代码如下:

      private Object convertValue(MetaObject metaDataSource, String propertyName, String value) {
        Object convertedValue = value;
        // 获得该属性的 setting 方法的参数类型
        Class<?> targetType = metaDataSource.getSetterType(propertyName);
        // 转化为对应的类型
        if (targetType == Integer.class || targetType == int.class) {
          convertedValue = Integer.valueOf(value);
        } else if (targetType == Long.class || targetType == long.class) {
          convertedValue = Long.valueOf(value);
        } else if (targetType == Boolean.class || targetType == boolean.class) {
          convertedValue = Boolean.valueOf(value);
        }
        return convertedValue;
      }
    

    3.2 PooledDataSourceFactory

      PooledDataSourceFactory 同样实现了 DataSourceFactory 接口,所有的 “池”,包括数据库连接池、线程池等都是基于复用创建的资源来减少重复地创建和清理回收资源的开销,当然资源本身也是需要消耗一定资源的比如CPU、内存等,处理策略上需要平衡性能与开销。

    POOLED —— 这种数据源的实现利用“池”的概念将 JDBC 连接对象组织起来,避免了创建新的连接实例时所必需的初始化和认证时间。 这是一种使得并发 Web 应用快速响应请求的流行处理方式。

    PooledDataSource 通过内置一个 UnpooledDataSource 类型的方式来组合非池化数据源对象的功能,专注于对连接池的管理,为了完善连接池的功能,提供了更多的属性来配置 POOLED 数据源:

    • poolMaximumActiveConnections — 最大活跃连接数,默认值:10。
    • poolMaximumIdleConnections — 最大空闲连接数,默认值:5。
    • poolMaximumCheckoutTime — 在被强制返回之前,池中连接最长检出时间(即取出连接到归还连接这段使用时间),默认值:20000ms(20s)。
    • poolTimeToWait — 这是一个底层设置,如果获取连接花费了相当长的时间,连接池会打印状态日志并重新尝试获取一个连接(避免在误配置的情况下一直安静地失败),默认值:20000ms(20s)。
    • poolPingQuery — 发送到数据库的侦测查询,用来检验连接是否正常工作并准备接受请求。默认是“NO PING QUERY SET”,这会导致多数数据库驱动失败时带有一个恰当的错误消息。
    • poolPingEnabled — 是否启用侦测查询。默认为关闭,若开启需要设置。
    • poolPingConnectionsNotUsedFor — 配置 poolPingQuery 的频率。可以被设置为和数据库连接超时时间一样,来避免不必要的侦测,默认值:0(即所有连接每一时刻都被侦测 — 当然仅当 poolPingEnabled 为 true 时适用)。

      PooledDataSourceFactory 继承了 UnpooledDataSourceFactory,同时也继承了非池化数据源工厂的 dataSource 属性,并在其构造函数中创建 PooledDataSource 对象进行初始化,代码如下:

    public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
    
      public PooledDataSourceFactory() {
        this.dataSource = new PooledDataSource();
      }
    
    }
    

    3.3 JndiDataSourceFactory

    JNDI —— 这个数据源的实现是为了能在如 EJB 或应用服务器这类容器中使用,容器可以集中或在外部配置数据源,然后放置一个 JNDI 上下文的引用。这种数据源配置只需要两个属性:

    • initial_context —— 这个属性用来在 InitialContext 中寻找上下文(即,initialContext.lookup(initial_context))。这是个可选属性,如果忽略,那么 data_source 属性将会直接从 InitialContext 中寻找。
    • data_source —— 这是引用数据源实例位置的上下文的路径。提供了 initial_context 配置时会在其返回的上下文中进行查找,没有提供时则直接在 InitialContext 中查找。

    和其他数据源配置类似,可以通过添加前缀 "env." 直接把属性传递给初始上下文,比如:

    • env.encoding=UTF8

    这就会在初始上下文(InitialContext) 实例化时往它的构造方法传递值为 UTF8encoding 属性。

    3.3.1 属性

      public static final String INITIAL_CONTEXT = "initial_context";
      public static final String DATA_SOURCE = "data_source";
      public static final String ENV_PREFIX = "env.";
    
      private DataSource dataSource;
    

      内置了 DataSource 对象,还有几个静态参数,从上下文中获得外部配置的数据源时用到。

    3.3.2 setProperties

      @Override
      public void setProperties(Properties properties) {
        try {
          InitialContext initCtx;
          // 获得环境属性
          Properties env = getEnvProperties(properties);
          // 创建 InitialContext 对象
          if (env == null) {
            initCtx = new InitialContext();
          } else {
            initCtx = new InitialContext(env);
          }
    
          // 在 initCtx 中寻找上下文,并在上下文中寻找数据源对象
          if (properties.containsKey(INITIAL_CONTEXT)
              && properties.containsKey(DATA_SOURCE)) {
            Context ctx = (Context) initCtx.lookup(properties.getProperty(INITIAL_CONTEXT));
            dataSource = (DataSource) ctx.lookup(properties.getProperty(DATA_SOURCE));
          // 直接在 initCtx 中寻找数据源对象
          } else if (properties.containsKey(DATA_SOURCE)) {
            dataSource = (DataSource) initCtx.lookup(properties.getProperty(DATA_SOURCE));
          }
    
        } catch (NamingException e) {
          throw new DataSourceException("There was an error configuring JndiDataSourceTransactionPool. Cause: " + e, e);
        }
      }
    

      首先从传入的属性中筛选出 "env." 开头的环境属性,接下来创建 InitialContext 对象,如果有传入 initial_context 属性,则先从指定上下文寻找 Context 对象,再从该对象中找到数据源;如果没有传入该属性,则直接从 InitialContext 对象中找到数据源。

    3.3.3 getEnvProperties

      // 获取环境属性,筛选出属性名以 "env." 开头的属性
      private static Properties getEnvProperties(Properties allProps) {
        final String PREFIX = ENV_PREFIX;
        Properties contextProperties = null;
        for (Entry<Object, Object> entry : allProps.entrySet()) {
          String key = (String) entry.getKey();
          String value = (String) entry.getValue();
          if (key.startsWith(PREFIX)) {
            if (contextProperties == null) {
              contextProperties = new Properties();
            }
            contextProperties.put(key.substring(PREFIX.length()), value);
          }
        }
        return contextProperties;
      }
    

    四、DataSource

      javax.sql.DataSource 在数据源模块的工厂模式中是产品接口的角色,内置的实现类有 UnpooledDataSourcePooledDataSource,数据库连接是通过该接口创建的,这是个牛逼闪闪的接口,在其上可以衍生出数据连接池、分库分表、读写分离等等功能。

    4.1 UnpooledDataSource

    4.1.1 属性

      private ClassLoader driverClassLoader;      // 驱动类加载器
      private Properties driverProperties;        // 数据库连接驱动相关属性配置
      // 驱动管理器中所有已注册的数据库驱动(key: 数据库驱动类名,value: 数据库驱动)
      private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();
    
      // 驱动属性配置
      private String driver;      // 数据库连接的驱动名称
      private String url;         // 数据库连接URL
      private String username;    // 用户名
      private String password;    // 密码
    
      private Boolean autoCommit;                         // 是否自动提交
      private Integer defaultTransactionIsolationLevel;   // 默认事务隔离级别
    
      // 静态代码块,在类加载的时候就将DriverManager中注册的JDBC Driver复制一份到registeredDrivers集合中
      static {
        Enumeration<Driver> drivers = DriverManager.getDrivers();
        while (drivers.hasMoreElements()) {
          Driver driver = drivers.nextElement();
          registeredDrivers.put(driver.getClass().getName(), driver);
        }
      }
    

    4.1.2 构造方法

      public UnpooledDataSource() {
      }
    
      public UnpooledDataSource(String driver, String url, String username, String password) {
        this.driver = driver;
        this.url = url;
        this.username = username;
        this.password = password;
      }
    
      public UnpooledDataSource(String driver, String url, Properties driverProperties) {
        this.driver = driver;
        this.url = url;
        this.driverProperties = driverProperties;
      }
    
      public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
        this.driverClassLoader = driverClassLoader;
        this.driver = driver;
        this.url = url;
        this.username = username;
        this.password = password;
      }
    
      public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
        this.driverClassLoader = driverClassLoader;
        this.driver = driver;
        this.url = url;
        this.driverProperties = driverProperties;
      }
    

    4.1.3 doGetConnection

    【功能】获取数据库连接。

      private Connection doGetConnection(String username, String password) throws SQLException {
        Properties props = new Properties();
        if (driverProperties != null) {
          props.putAll(driverProperties);
        }
        if (username != null) {
          props.setProperty("user", username);
        }
        if (password != null) {
          props.setProperty("password", password);
        }
        return doGetConnection(props);
      }
    
      private Connection doGetConnection(Properties properties) throws SQLException {
        initializeDriver();
        Connection connection = DriverManager.getConnection(url, properties);
        configureConnection(connection);
        return connection;
      }
    

      第一个重载方法会将用户名密码设置到 Properties 对象中,再调用 #doGetConnection(Properties),该方法会先调用 #initializeDriver() 初始化驱动,在创建数据连接对象,最后调用 #configureConnection(Connection) 方法配置并返回连接对象。

    4.1.4 initializeDriver

    【功能】初始化创建连接的指定驱动。
    【源码】

      // 初始化驱动
      private synchronized void initializeDriver() throws SQLException {
        // 如果该驱动已注解,则无需重复初始化
        if (!registeredDrivers.containsKey(driver)) {
          Class<?> driverType;
          try {
            // 加载指定驱动对应的类对象
            if (driverClassLoader != null) {
              driverType = Class.forName(driver, true, driverClassLoader);
            } else {
              driverType = Resources.classForName(driver);
            }
            // 创建驱动对象实例并注册到驱动管理器和本对象中
            Driver driverInstance = (Driver)driverType.newInstance();
            DriverManager.registerDriver(new DriverProxy(driverInstance));
            registeredDrivers.put(driver, driverInstance);
          } catch (Exception e) {
            throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
          }
        }
      }
    

    【解析】

    • (1)如果指定驱动已经被初始化并注册,则返回。
    • (2)若驱动尚未被初始化,则根据 driver(传入的类的全限定名)获得对应的驱动的 Class 对象,如:com.mysql.jdbc.Driver
    • (3)创建一个驱动对象的实例,注册到 DriverManager 中和添加到 registeredDrivers 中,注意这里注册到驱动管理器中的 Driver 是一个代理对象,其代码如下:
      private static class DriverProxy implements Driver {
        private Driver driver;
    
        DriverProxy(Driver d) {
          this.driver = d;
        }
    
        // 简单代理方法
    
        // @Override only valid jdk7+
        public Logger getParentLogger() {
          return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
        }
      }
    
    • 核心是 #getParentLogger() 方法,该方法是 Driver 接口中定义的方法,用来获得打印日志的 Logger 对象,这里用的是 MyBatis 日志模块中自定义的 Logger 对象,这样会在使用驱动时打印出日志,其他方法都是直接调用 Driver 的方法。

    4.1.5 configureConnection

    【功能】配置数据库连接对象。
    【源码】

      // 配置数据库连接是否采用自动提交和事务隔离级别
      private void configureConnection(Connection conn) throws SQLException {
        if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
          conn.setAutoCommit(autoCommit);
        }
        if (defaultTransactionIsolationLevel != null) {
          conn.setTransactionIsolation(defaultTransactionIsolationLevel);
        }
      }
    

    【解析】
      设置连接对象是否为自动提交和事务隔离级别。

    4.1.6 其他方法

      其余方法,主要是类属性的 getter 和 setter,比较简单。

    4.2 PooledDataSource

      PooledDataSource 是一个简单的、同步的、线程安全的数据库连接池,它将连接池的状态交给 PooledState 类来管理,将真正的连接对象及其代理对象交给 PooledConnection 类来管理,数据源对象交给 UnpooledDataSource 管理,并提供了许多可配置的选项来控制连接池的行为。

    4.2.1 属性

      private static final Log log = LogFactory.getLog(PooledDataSource.class);
    
      private final PoolState state = new PoolState(this);
    
      private final UnpooledDataSource dataSource;
    
      // OPTIONAL CONFIGURATION FIELDS
      // 可选配置项
      protected int poolMaximumActiveConnections = 10;        // 最大活跃连接数
      protected int poolMaximumIdleConnections = 5;           // 最大空闲连接数
      protected int poolMaximumCheckoutTime = 20000;          // 最大checkout时长:取出连接到归还连接这段使用时间
      protected int poolTimeToWait = 20000;                   // 在无法获取连接时,线程需要等待的时间
      protected String poolPingQuery = "NO PING QUERY SET";   // 在检测一个数据库连接是否可用时,会给数据库发送一个测试SQL语句
      protected boolean poolPingEnabled = false;              // 是否允许发送测试SQL语句
      protected int poolPingConnectionsNotUsedFor = 0;        // 当连接池超过多少毫秒未使用时,会发送一次测试SQL语句,检测连接是否正常
    
      private int expectedConnectionTypeCode;                 // 根据数据库的URL、用户名、密码生成的一个hash值,该哈希值用于标志着当前的连接池,在构造函数中初始化
    

    4.2.2 构造方法

    【功能】构造方法主要是创建 UnpooledDataSource 对象,并根据用户名、密码、连接得到一个能代表一个数据源的哈希值。

      public PooledDataSource() {
        dataSource = new UnpooledDataSource();
      }
    
      public PooledDataSource(UnpooledDataSource dataSource) {
        this.dataSource = dataSource;
      }
    
      public PooledDataSource(String driver, String url, String username, String password) {
        dataSource = new UnpooledDataSource(driver, url, username, password);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    
      public PooledDataSource(String driver, String url, Properties driverProperties) {
        dataSource = new UnpooledDataSource(driver, url, driverProperties);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    
      public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
        dataSource = new UnpooledDataSource(driverClassLoader, driver, url, username, password);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    
      public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
        dataSource = new UnpooledDataSource(driverClassLoader, driver, url, driverProperties);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    

    4.2.3 getConnection

    【功能】获取数据库连接。
    【源码】

      @Override
      public Connection getConnection() throws SQLException {
        return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
      }
    
      @Override
      public Connection getConnection(String username, String password) throws SQLException {
        return popConnection(username, password).getProxyConnection();
      }
    

    【解析】
      两个重载方法都会调用 #popConnection(String username, String password) 方法得到 PooledConnection 对象,在该对象中封装了真正创建的 java.sql.Connection 对象,生成该对象的代理对象并提供给外界使用,当调用该代理对象的 #close() 方法时,不会实际关闭该连接,而是将连接归还到空闲连接池中。

    4.2.4 popConnection

    【功能】从连接池中获得数据库连接。
    【源码】

      // 获得 PooledConnection 对象
      private PooledConnection popConnection(String username, String password) throws SQLException {
        boolean countedWait = false;            // 获取连接时是否经历了阻塞等待
        PooledConnection conn = null;           // 最终获得到的连接对象
        long t = System.currentTimeMillis();    // 获得连接的开始时间,用于统计本地连接获取的时间
        int localBadConnectionCount = 0;        // 统计本地获取连接的方法中累计获得坏连接的个数
    
        while (conn == null) {
          // 加锁: 连接只能同步获取
          synchronized (state) {
            if (!state.idleConnections.isEmpty()) {         // (1) 判断连接池是否有空闲连接
              // Pool has available connection
              conn = state.idleConnections.remove(0);// 如果有可用空闲连接则从空闲连接池中移除第一个连接
              if (log.isDebugEnabled()) {
                log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
              }
            } else {
              // (2) 如果连接池没有空闲连接,则先判断活跃连接数是否已达到最大值
              // (2.1) 如果不是则直接创建新连接,否则判断活跃连接中是否有超时连接
              // Pool does not have available connection
              if (state.activeConnections.size() < poolMaximumActiveConnections) {
                // Can create new connection
                conn = new PooledConnection(dataSource.getConnection(), this);
                if (log.isDebugEnabled()) {
                  log.debug("Created connection " + conn.getRealHashCode() + ".");
                }
              } else {
                // (2.2) 如果活跃连接中有超时连接,则先移除它,再创建新连接;否则阻塞等
                PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                // 活跃连接已超时,将其移除
                if (longestCheckoutTime > poolMaximumCheckoutTime) {
                  // 对连接超时的信息进行统计
                  // Can claim overdue connection
                  state.claimedOverdueConnectionCount++;                                    // 数据源使用超时连接数+1
                  state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; // 统计累计超时时间
                  state.accumulatedCheckoutTime += longestCheckoutTime;                     // 统计累计检出时间
                  state.activeConnections.remove(oldestActiveConnection);                   // 从活跃连接池中逸出已超时连接
                  // 回滚数据库操作
                  if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                    try {
                      oldestActiveConnection.getRealConnection().rollback();
                    } catch (SQLException e) {
                      log.debug("Bad connection. Could not roll back");
                    }  
                  }
                  // 为超时连接对象创建一个新的 PooledConnection 对象,主要是清除掉对象中的状态
                  conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                  oldestActiveConnection.invalidate();
                  if (log.isDebugEnabled()) {
                    log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
                  }
                } else {
                  // Must wait
                  // 阻塞等待
                  try {
                    if (!countedWait) {
                      state.hadToWaitCount++;
                      countedWait = true;
                    }
                    if (log.isDebugEnabled()) {
                      log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                    }
                    long wt = System.currentTimeMillis();
                    state.wait(poolTimeToWait);
                    state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                  } catch (InterruptedException e) {
                    break;
                  }
                }
              }
            }
            if (conn != null) {
              // 检查拿到的连接是否合法
              if (conn.isValid()) {
                if (!conn.getRealConnection().getAutoCommit()) {
                  conn.getRealConnection().rollback();
                }
                conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                conn.setCheckoutTimestamp(System.currentTimeMillis());
                conn.setLastUsedTimestamp(System.currentTimeMillis());
                state.activeConnections.add(conn);
                state.requestCount++;
                state.accumulatedRequestTime += System.currentTimeMillis() - t;
              } else {
                if (log.isDebugEnabled()) {
                  log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
                }
                state.badConnectionCount++;
                localBadConnectionCount++;
                conn = null;
                // 如果获取到的连接一直都是非法的,并且累计失败个数为 最大空闲连接池个数限制+3,则抛出异常
                if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
                  if (log.isDebugEnabled()) {
                    log.debug("PooledDataSource: Could not get a good connection to the database.");
                  }
                  throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                }
              }
            }
          }   // synchronized
        }   // while
    
        if (conn == null) {
          if (log.isDebugEnabled()) {
            log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
          }
          throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }
    
        return conn;
      }
    

    【解析】
      获取数据库连接的流程如下:


    • (1)从连接池中获得空闲连接,否则执行(2)。
    • (2)尝试创建连接或从活跃连接中回收超时连接。
      • (2.1)如果活跃连接数未达最大限制,优先创建新连接。
      • (2.2) 如果活跃连接中有超时连接,则先从活跃连接池中移除它,再创建新连接;否则阻塞等待。
    • (3)检查获得的连接是否合法。
      • (3.1)若合法,则添加到活跃连接池。
      • (3.2)若不合法,则重新尝试获得数据库连接,若累计失败次数超限,则抛出异常。

    4.2.5 pushConnection

    【功能】活跃连接使用完毕后关闭时,代理对象调用本方法处理。
    【源码】

      // 将用完的连接归还到连接池中
      protected void pushConnection(PooledConnection conn) throws SQLException {
    
        synchronized (state) {
          // (1) 将连接从活跃连接池中移除
          state.activeConnections.remove(conn);
          // (2) 判断连接是否合法
          if (conn.isValid()) {
            // (2.1.1) 如果空闲连接池未满,则回收到空闲连接集合中
            if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              if (!conn.getRealConnection().getAutoCommit()) {
                conn.getRealConnection().rollback();
              }
              PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
              state.idleConnections.add(newConn);
              newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
              newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
              conn.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
              }
              // 在 #popConnection 中获取连接时,若无空闲连接且活跃连接数已达上限且最老的活跃连接也尚未超时,则获取连接的线程需要等待
              // 当活跃连接使用完毕调用本方法时,这里就会唤醒所有等待的线程
              state.notifyAll();
            } else {
              // (2.1.2) 如果空闲连接池已满,则真正关闭该连接
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              if (!conn.getRealConnection().getAutoCommit()) {
                conn.getRealConnection().rollback();
              }
              conn.getRealConnection().close();
              if (log.isDebugEnabled()) {
                log.debug("Closed connection " + conn.getRealHashCode() + ".");
              }
              conn.invalidate();
            }
          } else {
            // (2.2) 非法则统计无效连接个数
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
            }
            state.badConnectionCount++;
          }
        }
      }
    

    【解析】


    • (1)将连接从活跃连接池中移除。
    • (2)判断连接是否合法。
      • (2.1.1)如果空闲连接池未满,则回收到空闲连接集合中,并通知 #popConnection() 方法中所有阻塞等待获得连接的线程。
      • (2.1.2)如果空闲连接池已满,则真正关闭该连接。
      • (2.2)非法则统计无效连接个数

    4.2.6 pingConnection

    【功能】通过侦测查询检查连接是否可用。
    【源码】

      // 检查连接是否可用的方法
      protected boolean pingConnection(PooledConnection conn) {
        // 记录是否ping成功
        boolean result = true;
    
        try {
          result = !conn.getRealConnection().isClosed();    // (1)检查真正的数据库连接是否已关闭
        } catch (SQLException e) {
          if (log.isDebugEnabled()) {
            log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
          }
          result = false;
        }
    
        if (result) {
          // (2)检测 poolPingEnabled 设置,是否运行执行测试SQL语句
          if (poolPingEnabled) {
            // (3)长时间(超过 poolPingConnectionsNotUsedFor 指定的时长)未使用的连接,才需要ping操作来检测数据库连接是否正常
            if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
              try {
                if (log.isDebugEnabled()) {
                  log.debug("Testing connection " + conn.getRealHashCode() + " ...");
                }
                // (4)通过执行 poolPingQuery 语句来发起 ping
                Connection realConn = conn.getRealConnection();
                Statement statement = realConn.createStatement();
                ResultSet rs = statement.executeQuery(poolPingQuery);
                rs.close();
                statement.close();
                if (!realConn.getAutoCommit()) {
                  realConn.rollback();
                }
                // 标记执行成功
                result = true;
                if (log.isDebugEnabled()) {
                  log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
                }
              } catch (Exception e) {
                // 执行失败会抛出异常,这里捕获并关闭真正的数据库连接
                log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
                try {
                  conn.getRealConnection().close();
                } catch (Exception e2) {
                  //ignore
                }
                result = false;
                if (log.isDebugEnabled()) {
                  log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
                }
              }
            }
          }
        }
        return result;
      }
    

    【解析】
      本方法会被 PooledConnection.isValid() 方法所调用,用来判断一个连接是否合法,处理流程如下:

    • (1)检查真正的数据库连接是否已关闭,是则直接返回false,表示侦测结果为不可用;否则执行(2)。
    • (2)检测 poolPingEnabled 开关,是否设置为运行执行测试SQL语句。
    • (3)检测连接当前未使用时间是否超过了侦测查询的设定时间。
    • (4)执行 poolPingQuery 语句,根据执行是否抛异常断定SQL语句是否执行成功。

    4.2.7 forceCloseAll

    【功能】关闭数据库连接池,当数据源对象被设置了新的参数或属性时,或者数据源对象被释放时触发 #finalize() 方法回收资源时会调用本方法。
    【源码】

      public void forceCloseAll() {
        synchronized (state) {
          expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
          /**
           * 关闭连接池连接三部曲:
           * (1) 将连接设置为非法
           * (2) 如果有事务未提交或回滚,则回滚该连接上的事务
           * (3) 调用 #close() 真正地关闭该连接
           */
          // 遍历活跃连接池中的连接并关闭
          for (int i = state.activeConnections.size(); i > 0; i--) {
            try {
              PooledConnection conn = state.activeConnections.remove(i - 1);
              conn.invalidate();
    
              Connection realConn = conn.getRealConnection();
              if (!realConn.getAutoCommit()) {
                realConn.rollback();
              }
              realConn.close();
            } catch (Exception e) {
              // ignore
            }
          }
          // 遍历空闲连接池中的连接并关闭
          for (int i = state.idleConnections.size(); i > 0; i--) {
            try {
              PooledConnection conn = state.idleConnections.remove(i - 1);
              conn.invalidate();
    
              Connection realConn = conn.getRealConnection();
              if (!realConn.getAutoCommit()) {
                realConn.rollback();
              }
              realConn.close();
            } catch (Exception e) {
              // ignore
            }
          }
        }
        if (log.isDebugEnabled()) {
          log.debug("PooledDataSource forcefully closed/removed all connections.");
        }
      }
    

    4.2.8 unwrapConnection

    【功能】从代理 Connection 对象中获得真正的 Connection 对象。
    【源码】

      public static Connection unwrapConnection(Connection conn) {
        // 判断是否连接对象类型是否为代理类型
        if (Proxy.isProxyClass(conn.getClass())) {
          // 获取 InvocationHandler 对象
          InvocationHandler handler = Proxy.getInvocationHandler(conn);
          // 如果是 PooledConnection 对象,则获取真实的连接
          if (handler instanceof PooledConnection) {
            return ((PooledConnection) handler).getRealConnection();
          }
        }
        return conn;
      }
    

    4.2.9 其他方法

      大量的参数对应的 getter/setter 方法,剩下的几个方法都比较简单,不再赘述。

    4.3 PooledState

      PooledState 保存了连接池的各种状态信息及相关的统计数据(比如请求数据库连接的次数、获得连接的等待时间、连接的检出使用时间、使用超时时间等等),最重要的是保存了记录空闲连接和活跃连接的集合。

    4.3.1 属性

      // PoolState.java
      protected PooledDataSource dataSource;      // PooledDataSource 和 PoolState 互相有对应类实例的一个成员
    
      protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();   // 空闲连接集合
      protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();                // 活跃连接集合
      protected long requestCount = 0;                                  // 请求数据库连接的次数
      protected long accumulatedRequestTime = 0;                        // 获得连接的累计时长
      protected long accumulatedCheckoutTime = 0;                       // 检出使用连接的累计时长
      protected long claimedOverdueConnectionCount = 0;                 // 连接超时个数
      protected long accumulatedCheckoutTimeOfOverdueConnections = 0;   // 累计超时时间
      protected long accumulatedWaitTime = 0;                           // 累计等待时间
      protected long hadToWaitCount = 0;                                // 阻塞等待的连接数
      protected long badConnectionCount = 0;                            // 无效连接数
    

    4.3.2 构造方法

    public PoolState(PooledDataSource dataSource) {
        this.dataSource = dataSource;
      }
    

    4.3.3 功能方法

      剩下的成员方法都比较简单,就是根据上述属性封装获得状态信息的方法,并且用 synchronized 修饰,保证数据同步,但感觉会影响 PooledDataSource 中使用 state 作为锁同步的代码的并发度,实际上一般项目中也不会直接使用 mybatis 内置的数据源模块,业界有多种成熟可靠的数据库连接池的组件提供选择(比如 DBCPC3P0Druid 等)。

      // 获取连接的次数
      public synchronized long getRequestCount() {
        return requestCount;
      }
    
      // 获取连接的平均请求时间
      public synchronized long getAverageRequestTime() {
        return requestCount == 0 ? 0 : accumulatedRequestTime / requestCount;
      }
    
      // 阻塞等待获得连接的请求的平均等待时间
      public synchronized long getAverageWaitTime() {
        return hadToWaitCount == 0 ? 0 : accumulatedWaitTime / hadToWaitCount;
    
      }
    
      // 阻塞等待的连接数
      public synchronized long getHadToWaitCount() {
        return hadToWaitCount;
      }
    
      // 累计获得的无效连接的个数
      public synchronized long getBadConnectionCount() {
        return badConnectionCount;
      }
    
      // 累计连接超时个数
      public synchronized long getClaimedOverdueConnectionCount() {
        return claimedOverdueConnectionCount;
      }
    
      // 超时连接的平均超时时间
      public synchronized long getAverageOverdueCheckoutTime() {
        return claimedOverdueConnectionCount == 0 ? 0 : accumulatedCheckoutTimeOfOverdueConnections / claimedOverdueConnectionCount;
      }
    
      // 连接的平均检出使用时间
      public synchronized long getAverageCheckoutTime() {
        return requestCount == 0 ? 0 : accumulatedCheckoutTime / requestCount;
      }
    
      // 获取空闲连接的个数
      public synchronized int getIdleConnectionCount() {
        return idleConnections.size();
      }
    
      // 获取活跃连接的个数
      public synchronized int getActiveConnectionCount() {
        return activeConnections.size();
      }
    
      // 线程池的综合状态信息
      @Override
      public synchronized String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("\n===CONFINGURATION==============================================");
        builder.append("\n jdbcDriver                     ").append(dataSource.getDriver());
        builder.append("\n jdbcUrl                        ").append(dataSource.getUrl());
        builder.append("\n jdbcUsername                   ").append(dataSource.getUsername());
        builder.append("\n jdbcPassword                   ").append((dataSource.getPassword() == null ? "NULL" : "************"));
        builder.append("\n poolMaxActiveConnections       ").append(dataSource.poolMaximumActiveConnections);
        builder.append("\n poolMaxIdleConnections         ").append(dataSource.poolMaximumIdleConnections);
        builder.append("\n poolMaxCheckoutTime            ").append(dataSource.poolMaximumCheckoutTime);
        builder.append("\n poolTimeToWait                 ").append(dataSource.poolTimeToWait);
        builder.append("\n poolPingEnabled                ").append(dataSource.poolPingEnabled);
        builder.append("\n poolPingQuery                  ").append(dataSource.poolPingQuery);
        builder.append("\n poolPingConnectionsNotUsedFor  ").append(dataSource.poolPingConnectionsNotUsedFor);
        builder.append("\n ---STATUS-----------------------------------------------------");
        builder.append("\n activeConnections              ").append(getActiveConnectionCount());
        builder.append("\n idleConnections                ").append(getIdleConnectionCount());
        builder.append("\n requestCount                   ").append(getRequestCount());
        builder.append("\n averageRequestTime             ").append(getAverageRequestTime());
        builder.append("\n averageCheckoutTime            ").append(getAverageCheckoutTime());
        builder.append("\n claimedOverdue                 ").append(getClaimedOverdueConnectionCount());
        builder.append("\n averageOverdueCheckoutTime     ").append(getAverageOverdueCheckoutTime());
        builder.append("\n hadToWait                      ").append(getHadToWaitCount());
        builder.append("\n averageWaitTime                ").append(getAverageWaitTime());
        builder.append("\n badConnectionCount             ").append(getBadConnectionCount());
        builder.append("\n===============================================================");
        return builder.toString();
      }
    

    4.4 PooledConnection

      PooledConnection 用到了 JDK 动态代理,它实现了 InvocationHandler 接口,封装了真正的数据库连接对象并为其生成代理对象,当外部调用关闭数据库连接时,代理对象执行 #close() 方法时会被拦截,并将数据库连接从活跃连接池归还到空闲连接池中去。

    4.4.1 属性

      // PooledConnection.java
      private static final String CLOSE = "close";         // 执行关闭连接时的方法名,用来在代理方法中拦截判断
      private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
    
      private int hashCode = 0;
      // 记录当前PooledConnection对象所在的PooledDataSource对象。该PooledConnection是从
      // 该PooledDataSource中获取的;当调用close()方法时会将PooledConnection放回该PooledConnection中,并不会实际关闭连接
      private PooledDataSource dataSource;
      private Connection realConnection;      // 真正的数据库连接
      private Connection proxyConnection;     // 数据库连接的代理对象,在代理对象方法内拦截"close"方法
      private long checkoutTimestamp;         // 从连接池中取出该连接的时间戳
      private long createdTimestamp;          // 该连接创建的时间戳
      private long lastUsedTimestamp;         // 最后一次被使用的时间戳
      private int connectionTypeCode;         // 由数据库URL、用户名和密码计算出来的hash值,可用于标识该连接所在的连接池
      private boolean valid;                  // 检测当前PooledConnection是否有效,主要是为了防止程序通过close方法将连接归还给连接池之后,依然通过该连接操作数据库导致出错
    

    4.4.2 构造方法

      public PooledConnection(Connection connection, PooledDataSource dataSource) {
        this.hashCode = connection.hashCode();
        this.realConnection = connection;
        this.dataSource = dataSource;
        this.createdTimestamp = System.currentTimeMillis();
        this.lastUsedTimestamp = System.currentTimeMillis();
        this.valid = true;
        this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
      }
    

    4.4.3 invoke

      该方法是实现 InvocationHandler 接口必须实现的方法,内置拦截代理对象的执行逻辑,源码如下:

      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 调用到的代理对象的方法
        String methodName = method.getName();
        // 如果是关闭数据库连接的 #close() 方法,则不实际关闭连接,而是将连接归还到连接池
        if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
          dataSource.pushConnection(this);
          return null;
        } else {
          try {
            // 如果调用的方法是非Object类的方法,则为Connection的其他方法,先检查连接是否合法,合法则通过反射调用该方法
            if (!Object.class.equals(method.getDeclaringClass())) {
              // issue #579 toString() should never fail
              // throw an SQLException instead of a Runtime
              checkConnection();
            }
            return method.invoke(realConnection, args);
          } catch (Throwable t) {
            throw ExceptionUtil.unwrapThrowable(t);
          }
        }
      }
    

    【解析】

    • (1)如果调用的是 Connection.close() 方法,不会真正执行该方法,则是调用 PooledDataSource.pushConnection(PooledConnection conn) 处理,该方法会将连接从活跃连接池转移到空闲连接池,当然如果空闲连接池已满,才会真正地关闭该连接。
    • (2)如果执行的是 Connection 对象的其他方法,则需要先检查连接是否合法,非法则抛出异常,校验的方法处理逻辑如下:
      private void checkConnection() throws SQLException {
        if (!valid) {
          throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
        }
      }
    

    如果是 Object 类的方法,或者连接合法,则直接通过反射调用该方法。

    4.4.4 invalidate

    【功能】将连接置为无效。
    【源码】

      public void invalidate() {
        valid = false;
      }
    

    【使用场景】

    • (1)重置数据源的连接配置属性/释放数据源对象触发调用 #forceCloseAll() 方法时,其内部处理关闭连接池的每个连接前都会先将连接置为无效。
    • (2)连接使用完后回收到连接池时
      • (2.1)空闲连接池未满,基于活跃连接对象中的真实连接创建一个新的 PooledConnection 对象,并将原来的活跃连接对象置为无效。
      • (2.2)空闲连接池已慢,将该要被回收的连接直接置为无效并关闭掉。

    4.4.5 其他方法

      主要是类中属性的 getter/setter,比较简单。

    相关文章

      网友评论

        本文标题:[MyBatis源码分析 - 数据源模块]

        本文链接:https://www.haomeiwen.com/subject/vfqtiktx.html