一、spring+mybatis部分原理
mybatis中我们将数据源定义为dataSource,每次数据库操作都会去获取一个Connection来执行sql,实现数据库读写分离即让程序按照我们的需求从不同的数据源去获取Connection。
1.执行单个dao的sql时数据库操作
执行单个sql的时候的代码底层会判断当前线程对应的DataSource是否有开启事务。
有开启事务:优先从TransactionSynchronizationManager类的线程本地变量ThreadLocal中去获取Connection,如果从ThreadLocal中没有获取到会直接从DataSource中去获取一个新的Connection并保存到ThreadLocal。
没有开启事务:直接从DataSource中获取Connection并返回。
dao的查询如何获取connection.png
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(dataSource.getConnection());
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
logger.debug("Registering transaction synchronization for JDBC Connection");
// Use same Connection for further JDBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
2.Spring的@Translational声明式事务时的数据库操作
事务管理器配置:
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="masterDataSource_1"/>
</bean>
当使用Spring的事务管理时,通过Aop切面,执行注解了@Translational的事务方法前会进入到我们配置的DataSourceTransactionManager类的doBegin(…)方法中,本方法会尝试从TransactionSynchronizationManager的ThreadLocal中去获取DataSource对应的Connection,如果为空则从DataSource中获取一个新的Connection并放到ThreadLocal中,同时也会设置connection的autoCommit为false。
部分源码如下:
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
}
总结:以上就是目前单库的connection获取流程,如果方法通过Transactional开启了事务,方法执行前会获取一个Connection,设置autoCommit为false并保存到ThreadLocal中,方法执行过程中的任何一个sql都会获取复用该Connection。对于不在事务的sql则会每次都获取一个Connection。
二、几种实现方案及优缺点
1.主库sql和从库sql放到不同的包名下配置不同的数据源
将更新操作和查询操作的sql分开写并分别放到queryDao、updateDao目录下, 不同的目录配置不同的dataSource来控制queryDao下的sql走从库,updateDao下的sql走主库。事务配置使用主库。
<!-- 主库配置 -->
<bean id="masterSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="masterDataSource"/>
<property name="mapperLocations" value="classpath*:/com/jd/trip/hotel/mapping/updateDao/mapper/*.xml"/>
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.jd.trip.hotel.mapping.updateDao" />
<property name="sqlSessionFactoryBeanName" value="masterSqlSessionFactory" />
</bean>
<!-- 从库配置 -->
<bean id="slaveSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="slaveDataSource"/>
<property name="mapperLocations" value="classpath*:/com/jd/trip/hotel/mapping/queryDao/mapper/*.xml"/>
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.jd.trip.hotel.mapping.queryDao" />
<property name="sqlSessionFactoryBeanName" value="slaveSqlSessionFactory" />
</bean>
<!-- 事务 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="masterDataSource"/>
</bean>
优点:逻辑简单易控制,不容易出问题。
缺点:1.对于旧系统,改动比较多, 2.如果需要强制查询主库,可能要写一个SelectFromMasterDao放在updateDao包下,并且可能会存在重复代码。
2.mybatis插件分析sql来决定数据源
通过mybatis插件拦截sql执行,通过对每条sql的xml配置解析来判断走主库还是从库。将主库或者从库的对应枚举设置到全局的ThreadLocal中,结合AbstractRoutingDataSource通过读取当前线程的数据源类型来控制执行sql时采用哪一个DataSource,sql完成之后清除ThreadLocal。
配置mybatis拦截器:
mybatis插件配置
<plugins>
<plugin interceptor="com.jd.trip.hotel.mapping.common.framework.datasource.DynamicSourceInterceptor">
</plugin>
</plugins>
mybatis拦截器
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {
MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = {
MappedStatement.class, Object.class, RowBounds.class,
ResultHandler.class }) })
public class DynamicSourceInterceptor implements Interceptor {
protected static final Logger logger = LoggerFactory.getLogger(DynamicSourceInterceptor.class);
private static final String REGEX = "\\s*insert\\u0020.*|\\s*delete\\u0020.*|\\s*update\\u0020.*";
private static final Map<String, DynamicDataSourceGlobal> cacheMap = new ConcurrentHashMap<>();
@Override
public Object intercept(Invocation invocation) throws Throwable {
boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
if(!synchronizationActive) {
Object[] objects = invocation.getArgs();
MappedStatement ms = (MappedStatement) objects[0];
DynamicDataSourceGlobal dynamicDataSourceGlobal = cacheMap.get(ms.getId());
if(dynamicDataSourceGlobal == null) {
//读方法
if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {
//!selectKey 为自增id查询主键(SELECT LAST_INSERT_ID() )方法,使用主库
if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
} else {
BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");
if(sql.matches(REGEX)) {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
} else {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.READ;
}
}
}else{
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
}
cacheMap.put(ms.getId(), dynamicDataSourceGlobal);
}
DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);
}
try {
Object result = invocation.proceed();
DynamicDataSourceHolder.clearDataSource();
return result;
} catch (Exception e) {
logger.error("执行sql报错, invocation:{}, error:{}", invocation, e.getMessage());
throw e;
}
}
@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
} else {
return target;
}
}
@Override
public void setProperties(Properties properties) {
//
}
}
动态dataSource类 :
动态数据源配置
<!—动态数据源代理 -->
<bean id="dynamicDataSource" class="com.jd.trip.hotel.mapping.common.framework.datasource.DynamicDataSource">
<property name="writeDataSource" ref="masterDataSourcePool"/>
<property name="readDataSource" ref="slaveDataSourcePool"/>
</bean>
DynamicDataSource
public class DynamicDataSource extends AbstractRoutingDataSource {
private Object writeDataSource; //写数据源
private Object readDataSource; //读数据源
@Override
public void afterPropertiesSet() {
if (this.writeDataSource == null) {
throw new IllegalArgumentException("Property 'writeDataSource' is required");
}
setDefaultTargetDataSource(writeDataSource);
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);
if(readDataSource != null) {
targetDataSources.put(DynamicDataSourceGlobal.READ.name(), readDataSource);
}
setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
@Override
protected Object determineCurrentLookupKey() {
DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();
if(dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE) {
return DynamicDataSourceGlobal.WRITE.name();
}
return DynamicDataSourceGlobal.READ.name();
}
public void setWriteDataSource(Object writeDataSource) {
this.writeDataSource = writeDataSource;
}
public Object getWriteDataSource() {
return writeDataSource;
}
public Object getReadDataSource() {
return readDataSource;
}
public void setReadDataSource(Object readDataSource) {
this.readDataSource = readDataSource;
}
}
AbstractRoutingDataSource
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
private Map<Object, Object> targetDataSources;
private Object defaultTargetDataSource;
…
public void setTargetDataSources(Map<Object, Object> targetDataSources) {
this.targetDataSources = targetDataSources;
}
…
public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
this.defaultTargetDataSource = defaultTargetDataSource;
}
…
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
…
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSourcerouter not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback ||lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSourcefor lookup key [" + lookupKey + "]");
}
return dataSource;
}
…
}
数据类型枚举:
数据源类型枚举
public enum DynamicDataSourceGlobal {
/**
* 读库
*/
READ,
/**
* 写库
*/
WRITE;
}
全局数据源类型线程本地变量:
数据源类型容器
public class DynamicDataSourceHolder {
/**
* 当前执行的sql应该走主库还是从库的holder
*/
private static final ThreadLocal<DynamicDataSourceGlobal> HOLDER = new ThreadLocal();
public static void putDataSource(DynamicDataSourceGlobal dataSource){
HOLDER.set(dataSource);
}
public static void clearDataSource() {
HOLDER.remove();
}
public static DynamicDataSourceGlobal getDataSource(){
return HOLDER.get();
}
}
优点:逻辑简单易控制,不容易出问题。系统改动少。
缺点:没有好的方案应对强制查询主库。(可以考虑通过manager层加事务注解的方式强制查询主库,不过有些牵强还会影响性能)
3.手动切换主从数据源
默认数据源类型采用主库,需要走从库的时候用代码显式切换数据源,结合AbstractRoutingDataSource通过读取当前线程的数据源类型来控制执行sql时采用哪一个DataSource。
Manager方法:
手动切换数据源
@Override
public HotelMapDO selectByPrimaryKey(Long hotelMapId) {
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ);
HotelMapDO hotelMapDO = hotelMapDao.selectByPrimaryKey(hotelMapId);
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE);
return hotelMapDO;
}
AbstractRoutingDataSource同方案2。
优点:控制灵活。
缺点:显式切换数据源易出错,对现有大量从库查询也需要代码改造。
4.通过注解及切面确定主从数据源
默认数据源类型采用主库,需要走从库的时候,通过Spring Aop + 自定义注解 ,对加了注解的方法进行拦截设置指定的数据库类型,结合AbstractRoutingDataSource控制数据源类型。
自定义注解控制数据源
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DataSource {
DynamicDataSourceGlobal value() default DynamicDataSourceGlobal.WRITE;
}
Manager方法:
自定义枚举的使用
@Override
@DataSource(DynamicDataSourceGlobal.READ)
public HotelMapDO selectByPrimaryKey(Long hotelMapId) {
return hotelMapDao.selectByPrimaryKey(hotelMapId);
}
切面类:
DynamicDataSourceAspect
@Aspect
public class DynamicDataSourceAspect {
protected org.slf4j.Logger logger = LoggerFactory.getLogger(this.getClass());
@Pointcut("execution(* *.*(..)) && @annotation(com.jd.trip.hotel.mapping.common.framework.datasource.DataSource)")
public void pointCut() {
}
@Before("pointCut()")
public void before(JoinPoint point) {
Object target = point.getTarget();
String methodName = point.getSignature().getName();
Class<?> targetClass = target.getClass();
Class<?>[] parameterTypes = ((MethodSignature) point.getSignature()).getMethod().getParameterTypes();
try {
Method method = targetClass.getMethod(methodName, parameterTypes);
if (method != null && method.isAnnotationPresent(DataSource.class)) {
DataSource data = method.getAnnotation(DataSource.class);
DynamicDataSourceHolder.putDataSource(data.value());
}
} catch (Exception e) {
logger.error("Choose DataSource error, method:{}, msg:{}", methodName, e.getMessage(), e);
}
}
@After("pointCut()")
public void after(JoinPoint point) {
DynamicDataSourceHolder.clearDataSource();
}
}
AbstractRoutingDataSource同方案2。
优点:不需要手动切换,只需要加注解
缺点:同3,有大量从库查询需要代码改造
5.基于jdbc driver实现
配置mysql的Driver的时候使用ReplicationDriver,并配置主库从库:
driver配置
jdbc.driverClassName=com.mysql.jdbc.ReplicationDriver
jdbc.url=jdbc:mysql:replication://{master}:3307,{slave}:3308/test?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNul
通过配置“readOnly = true”,driver获取Connection的时候走从库
事务控制
@Override
@Transactional(readOnly = true)
public GeoMapDO selectById(Long geoMapId) {
return geoMapDao.selectByPrimaryKey(geoMapId);
}
优点:配置、使用简单
缺点:走从库的查询都需要改成事务控制,另外因为开启了事务是否会性能差一些?
网友评论