环境
使用 sharding-jdbc-spring-boot-data-mybatis-example 这个工程,研究下主从结构的数据源处理。
PreparedStatement的创建MasterSlaveDataSource 中获得连接
每次都是创建一个新的连接 MasterSlaveConnection,并将 MasterSlaveDataSource 作为参数传入。
public Connection getConnection() throws SQLException {
return new MasterSlaveConnection(this);
}
mybatis 的 PreparedStatementHandler
// 创建实际执行使用的 Statement
protected Statement instantiateStatement(Connection connection) throws SQLException {
// 通过 BoundSql 生成实际的 sql 语句
String sql = this.boundSql.getSql();
if (this.mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
String[] keyColumnNames = this.mappedStatement.getKeyColumns();
return keyColumnNames == null ? connection.prepareStatement(sql, 1) : connection.prepareStatement(sql, keyColumnNames);
} else {
return this.mappedStatement.getResultSetType() != null ? connection.prepareStatement(sql, this.mappedStatement.getResultSetType().getValue(), 1007) : connection.prepareStatement(sql);
}
}
这里 connection 对象是 MasterSlaveConnection ,返回值 为 MasterSlavePreparedStatement 对象。
MasterSlaveConnection 的prepareStatement 在创建 MasterSlavePreparedStatement 过程中会根据 SQL 选择 DataSource。
为什么 connection 对象是 MasterSlaveConnection 呢?
使用 springboot 时,sharding-jdbc 使用了 spring.factories 的方式配置了 SpringBootConfiguration,这个配置里,创建 MasterSlaveDataSource。
// SpringBootConfiguration
@Bean
public DataSource dataSource() throws SQLException {
return null == masterSlaveProperties.getMasterDataSourceName()
? ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingProperties.getShardingRuleConfiguration(), shardingProperties.getConfigMap(), shardingProperties.getProps())
: MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveProperties.getMasterSlaveRuleConfiguration(), masterSlaveProperties.getConfigMap());
}
MasterSlavePreparedStatement 的创建
可能看到 getConnections 返回的是多个链接,根据 SQL 类型来支持分库和分表处理。
SQLJudgeEngine 判断 SQL 类型,通常 「创建」,「更新」和「插入」强制使用主库型连接池。
针对每个实际的,一般是数据库连接池 中的 Connection
,创建 PreparedStatement
, 放到 routedStatements 中。
public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this.connection = connection;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (Connection each : connection.getConnections(sqlStatement.getType())) {
PreparedStatement preparedStatement = each.prepareStatement(sql, autoGeneratedKeys);
routedStatements.add(preparedStatement);
}
}
MasterSlaveConnection 获得链接
SqlType 会缓存到 MasterSlaveConnection 对象中。
这里有一个 replayMethodsInvocation 的方法,为什么需要 replayMethodsInvocation
。
由于在执行 PreparedStatement 之前,无法确切知道 SQL,不能够定位执行所需要全部链接,这样对链接做一些设置可能会丢失,比如 setAutoCommit状态。
通过 replayMethodsInvocation 将所有对 Connection 的操作捕获,再重放到每个链接上,同步设置。
每次获取连接都加入到 cachedConnections
中, 在执行完成后 MasterSlaveConnection 将会被关闭,cachedConnections 也会被清理。
MasterSlaveConnection 关闭时,会将对连接池的连接进行关闭处理,通常情况下,如果连接没有关闭,连接会重新放入连接池。
public Collection<Connection> getConnections(final SQLType sqlType) throws SQLException {
cachedSQLType = sqlType;
Map<String, DataSource> dataSources = SQLType.DDL == sqlType ? masterSlaveDataSource.getMasterDataSource() : masterSlaveDataSource.getDataSource(sqlType).toMap();
Collection<Connection> result = new LinkedList<>();
for (Entry<String, DataSource> each : dataSources.entrySet()) {
String dataSourceName = each.getKey();
if (getCachedConnections().containsKey(dataSourceName)) {
result.add(getCachedConnections().get(dataSourceName));
continue;
}
Connection connection = each.getValue().getConnection();
getCachedConnections().put(dataSourceName, connection);
result.add(connection);
replayMethodsInvocation(connection);
}
return result;
}
数据源的路由处理
MasterSlaveDataSource 根据 SQLType 参数和路由规则决定最终的数据源。
public NamedDataSource getDataSource(final SQLType sqlType) {
if (isMasterRoute(sqlType)) {
DML_FLAG.set(true);
return new NamedDataSource(masterSlaveRule.getMasterDataSourceName(), masterSlaveRule.getMasterDataSource());
}
String selectedSourceName = masterSlaveRule.getStrategy().getDataSource(masterSlaveRule.getName(),
masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceMap().keySet()));
DataSource selectedSource = selectedSourceName.equals(masterSlaveRule.getMasterDataSourceName())
? masterSlaveRule.getMasterDataSource() : masterSlaveRule.getSlaveDataSourceMap().get(selectedSourceName);
Preconditions.checkNotNull(selectedSource, "");
return new NamedDataSource(selectedSourceName, selectedSource);
}
连接设置的记录处理
复写 Connection 的时候,需要针对所有的设置操作进行记录,MasterSlaveConnection 继承至 AbstractConnectionAdapter, 我们可以看到这样的处理。
如果考虑到batch的操作,那么 Connection
是逐渐增加的,所以需要对之前 cachedConnections 进行设置。
需要记录设置是 setAutoCommit
, setReadOnly
以及 setTransactionIsolation
@Override
public final void setAutoCommit(final boolean autoCommit) throws SQLException {
this.autoCommit = autoCommit;
recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
for (Connection each : cachedConnections.values()) {
each.setAutoCommit(autoCommit);
}
}
@Override
public final void setReadOnly(final boolean readOnly) throws SQLException {
this.readOnly = readOnly;
recordMethodInvocation(Connection.class, "setReadOnly", new Class[] {boolean.class}, new Object[] {readOnly});
for (Connection each : cachedConnections.values()) {
each.setReadOnly(readOnly);
}
}
@Override
public final void setTransactionIsolation(final int level) throws SQLException {
transactionIsolation = level;
recordMethodInvocation(Connection.class, "setTransactionIsolation", new Class[] {int.class}, new Object[] {level});
for (Connection each : cachedConnections.values()) {
each.setTransactionIsolation(level);
}
}
执行 SQL 并清理
前面那个很长的调用栈实际由 SimpleExecutor
的 doUpdate
处理.
closeStatement
会触发 Connection 的 close
方法,从而完成连接池的回收工作。
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
} finally {
closeStatement(stmt);
}
}
关闭的堆栈如下
关闭连接
StatementHandler 即 PreparedStatementHandler
,执行 update
.
public int update(Statement statement) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.execute();
int rows = ps.getUpdateCount();
Object parameterObject = boundSql.getParameterObject();
KeyGenerator keyGenerator = mappedStatement.getKeyGenerator();
keyGenerator.processAfter(executor, mappedStatement, ps, parameterObject);
return rows;
}
网友评论