by shihang.mai
1. 为什么需要动态数据源
我们来查看源码
- mapper.getXXXX
- MapperProxy的invoke()
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable var5) {
throw ExceptionUtil.unwrapThrowable(var5);
}
} else {
MapperMethod mapperMethod = this.cachedMapperMethod(method);
//-----------看这-------------
return mapperMethod.execute(this.sqlSession, args);
}
}
- MapperMethod的execute()
public Object execute(SqlSession sqlSession, Object[] args) {
Object param;
Object result;
if (SqlCommandType.INSERT == this.command.getType()) {
param = this.method.convertArgsToSqlCommandParam(args);
result = this.rowCountResult(sqlSession.insert(this.command.getName(), param));
} else if (SqlCommandType.UPDATE == this.command.getType()) {
param = this.method.convertArgsToSqlCommandParam(args);
result = this.rowCountResult(sqlSession.update(this.command.getName(), param));
} else if (SqlCommandType.DELETE == this.command.getType()) {
param = this.method.convertArgsToSqlCommandParam(args);
result = this.rowCountResult(sqlSession.delete(this.command.getName(), param));
} else if (SqlCommandType.SELECT == this.command.getType()) {
if (this.method.returnsVoid() && this.method.hasResultHandler()) {
this.executeWithResultHandler(sqlSession, args);
result = null;
} else if (this.method.returnsMany()) {
result = this.executeForMany(sqlSession, args);
} else if (this.method.returnsMap()) {
result = this.executeForMap(sqlSession, args);
} else {
param = this.method.convertArgsToSqlCommandParam(args);
//-----------看这-------------
result = sqlSession.selectOne(this.command.getName(), param);
}
} else {
if (SqlCommandType.FLUSH != this.command.getType()) {
throw new BindingException("Unknown execution method for: " + this.command.getName());
}
result = sqlSession.flushStatements();
}
if (result == null && this.method.getReturnType().isPrimitive() && !this.method.returnsVoid()) {
throw new BindingException("Mapper method '" + this.command.getName() + " attempted to return null from a method with a primitive return type (" + this.method.getReturnType() + ").");
} else {
return result;
}
}
- SqlSessionTemplate的selectOne()
public <T> T selectOne(String statement, Object parameter) {
return this.sqlSessionProxy.selectOne(statement, parameter);
}
- SqlSessionTemplate内部类SqlSessionInterceptor.invoke()
private class SqlSessionInterceptor implements InvocationHandler {
private SqlSessionInterceptor() {
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//--------------2.看这---------------
SqlSession sqlSession = SqlSessionUtils.getSqlSession(SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
Object unwrapped;
try {
//看序号7 DefaultSqlSession
Object result = method.invoke(sqlSession, args);
if (!SqlSessionUtils.isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
sqlSession.commit(true);
}
unwrapped = result;
} catch (Throwable var11) {
unwrapped = ExceptionUtil.unwrapThrowable(var11);
if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
sqlSession = null;
Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException)unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw (Throwable)unwrapped;
} finally {
if (sqlSession != null) {
//-----------1.看这---------
SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
return unwrapped;
}
}
- SqlSessionUtils.closeSqlSession
//1
public static void closeSqlSession(SqlSession session, SqlSessionFactory sessionFactory) {
Assert.notNull(session, "No SqlSession specified");
Assert.notNull(sessionFactory, "No SqlSessionFactory specified");
SqlSessionHolder holder = (SqlSessionHolder)TransactionSynchronizationManager.getResource(sessionFactory);
if (holder != null && holder.getSqlSession() == session) {
//开启事务,没关闭session
if (logger.isDebugEnabled()) {
logger.debug("Releasing transactional SqlSession [" + session + "]");
}
holder.released();
} else {
//没开启事务走这个分支,关闭session
if (logger.isDebugEnabled()) {
logger.debug("Closing non transactional SqlSession [" + session + "]");
}
session.close();
}
}
//2 发现sqlSession会先从缓存中获取。
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
Assert.notNull(sessionFactory, "No SqlSessionFactory specified");
Assert.notNull(executorType, "No ExecutorType specified");
SqlSessionHolder holder = (SqlSessionHolder)TransactionSynchronizationManager.getResource(sessionFactory);
if (holder != null && holder.isSynchronizedWithTransaction()) {
if (holder.getExecutorType() != executorType) {
throw new TransientDataAccessResourceException("Cannot change the ExecutorType when there is an existing transaction");
} else {
holder.requested();
if (logger.isDebugEnabled()) {
logger.debug("Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction");
}
return holder.getSqlSession();
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Creating a new SqlSession");
}
SqlSession session = sessionFactory.openSession(executorType);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
if (logger.isDebugEnabled()) {
logger.debug("Registering transaction synchronization for SqlSession [" + session + "]");
}
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
TransactionSynchronizationManager.registerSynchronization(new SqlSessionUtils.SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) != null) {
throw new TransientDataAccessResourceException("SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
if (logger.isDebugEnabled()) {
logger.debug("SqlSession [" + session + "] was not registered for synchronization because DataSource is not transactional");
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("SqlSession [" + session + "] was not registered for synchronization because synchronization is not active");
}
return session;
}
}
解释一大轮,就是如果开启了事务,那么只要是一个service中的所有数据库操作都属于一个sqlsession,反之则不是
- DefaultSqlSession的selectOne
public <T> T selectOne(String statement, Object parameter) {
List<T> list = this.selectList(statement, parameter);
if (list.size() == 1) {
return list.get(0);
} else if (list.size() > 1) {
throw new TooManyResultsException("Expected one result (or null) to be returned by selectOne(), but found: " + list.size());
} else {
return null;
}
}
public <E> List<E> selectList(String statement, Object parameter) {
return this.selectList(statement, parameter, RowBounds.DEFAULT);
}
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
List var5;
try {
MappedStatement ms = this.configuration.getMappedStatement(statement);
//-------看这--------
var5 = this.executor.query(ms, this.wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception var9) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + var9, var9);
} finally {
ErrorContext.instance().reset();
}
return var5;
}
- BaseExecutor的query
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
CacheKey key = this.createCacheKey(ms, parameter, rowBounds, boundSql);
//---------看这----------
return this.query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (this.closed) {
throw new ExecutorException("Executor was closed.");
} else {
if (this.queryStack == 0 && ms.isFlushCacheRequired()) {
this.clearLocalCache();
}
List list;
try {
++this.queryStack;
list = resultHandler == null ? (List)this.localCache.getObject(key) : null;
if (list != null) {
this.handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
//---------看这------
list = this.queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
--this.queryStack;
}
if (this.queryStack == 0) {
Iterator i$ = this.deferredLoads.iterator();
while(i$.hasNext()) {
BaseExecutor.DeferredLoad deferredLoad = (BaseExecutor.DeferredLoad)i$.next();
deferredLoad.load();
}
this.deferredLoads.clear();
if (this.configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
this.clearLocalCache();
}
}
return list;
}
}
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
this.localCache.putObject(key, ExecutionPlaceholder.EXECUTION_PLACEHOLDER);
List list;
try {
list = this.doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
this.localCache.removeObject(key);
}
this.localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
this.localOutputParameterCache.putObject(key, parameter);
}
return list;
}
- SimpleExecutor的doQuery
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
List var9;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
//---------看这---------
stmt = this.prepareStatement(handler, ms.getStatementLog());
var9 = handler.query(stmt, resultHandler);
} finally {
this.closeStatement(stmt);
}
return var9;
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
//-------看这-----------
Connection connection = this.getConnection(statementLog);
Statement stmt = handler.prepare(connection);
handler.parameterize(stmt);
return stmt;
}
protected Connection getConnection(Log statementLog) throws SQLException {
//-------看这----------
Connection connection = this.transaction.getConnection();
return statementLog.isDebugEnabled() ? ConnectionLogger.newInstance(connection, statementLog, this.queryStack) : connection;
}
- SpringManagedTransaction的getConnection()
//事务管理器中如果有连接,直接返回,否则从数据源DataSource获取
public class SpringManagedTransaction implements Transaction {
private final DataSource dataSource;
private Connection connection;
private boolean isConnectionTransactional;
private boolean autoCommit;
public Connection getConnection() throws SQLException {
if (this.connection == null) {
this.openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (logger.isDebugEnabled()) {
logger.debug("JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
}
}
}
一个数据源只能对应一个数据库,这时我们有多个数据库即多个数据源,可是事务管理器中只有一个数据源的引用,所以我们需要动态数据源
2. 如何做(只写核心部分)
- 事务管理器中只有一个数据源的引用,所以我们可以自定义一个类 DynamicDataSource 来实现 DataSource
- DynamicDataSource 中存储我们配置的多数据源,然后将 DynamicDataSource 的实例配置给事务管理器
- 当从事务管理器获取 Connection 对象的时候,会从 DynamicDataSource 实例获取,然后再由 DynamicDataSource 根据 routeKey 路由到某个具体的数据源,从中获取 Connection。
![](https://img.haomeiwen.com/i16110947/c3d814bf470c5388.png)
Spring 也考虑到了这一点,提供了一个抽象类AbstractRoutingDataSource
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
//使用DatabaseContextHolder获取当前线程的DatabaseType
return DatabaseContextHolder.getDatabaseType();
}
}
3. 动态数据源原理图
![](https://img.haomeiwen.com/i16110947/0989f6000831a2ac.png)
网友评论